kloudbuster/monitor.py
Yichen Wang 09baadba2c Initial release of VMTP to stackforge
Change-Id: I30eb092d9a70dc6b3642a84887bb4604b1a3ea54
2015-02-09 14:14:00 -08:00

444 lines
14 KiB
Python
Executable File

#!/usr/bin/env python
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
'''
Module for parsing statistical output from Ganglia (gmond) server
The module opens a socket connection to collect statistical data.
It parses the raw data in xml format.
The data from ganglia/gmond is in a heirarchical xml format as below:
<CLUSTER>
<HOST..>
<METRIC ../>
<METRIC ../>
:
</HOST>
:
<HOST..>
<METRIC ../>
<METRIC ../>
</HOST>
</CLUSTER>
## Usage:
Using the module is simple.
1. instantiate the Monitor with the gmond server ip and port to poll.
gmon = Monitor("172.22.191.151", 8649)
2. Start the monitoring thread
gmon.start_monitoring_thread(frequency, count)
< run tests/tasks>
gmon.stop_monitoring_thread()
3. Collecting stats:
cpu_metric = gmon.build_cpu_metric()
Returns a dictionary object with all the cpu stats for each
node
'''
import datetime
import re
import socket
import subprocess
from threading import Thread
import time
from lxml import etree
class MonitorExecutor(Thread):
'''
Thread handler class to asynchronously collect stats
'''
THREAD_STOPPED = 0
THREAD_RUNNING = 1
def __init__(self, gmond_svr, gmond_port, freq=5, count=5):
super(MonitorExecutor, self).__init__()
self.gmond_svr_ip = gmond_svr
self.gmond_port = gmond_port
self.freq = freq
self.count = count
self.force_stop = False
self.thread_status = MonitorExecutor.THREAD_STOPPED
# This dictionary always holds the latest metric.
self.gmond_parsed_tree_list = []
def run(self):
'''
The thread runnable method.
The function will periodically poll the gmond server and
collect the metrics.
'''
self.thread_status = MonitorExecutor.THREAD_RUNNING
count = self.count
while count > 0:
if self.force_stop:
self.thread_status = MonitorExecutor.THREAD_STOPPED
return
self.parse_gmond_xml_data()
count -= 1
time.sleep(self.freq)
self.thread_status = MonitorExecutor.THREAD_STOPPED
def set_force_stop(self):
'''
Setting the force stop flag to stop the thread. By default
the thread stops after the specific count/iterations is reached
'''
self.force_stop = True
def parse_gmond_xml_data(self):
'''
Parse gmond data (V2)
Retrieve the ganglia stats from the aggregation node
:return: None in case of error or a dictionary containing the stats
'''
gmond_parsed_tree = {}
raw_data = self.retrieve_stats_raw()
if raw_data is None or len(raw_data) == 0:
print "Failed to retrieve stats from server"
return
xtree = etree.XML(raw_data)
############################################
# Populate cluster information.
############################################
for elem in xtree.iter('CLUSTER'):
gmond_parsed_tree['CLUSTER-NAME'] = str(elem.get('NAME'))
gmond_parsed_tree['LOCALTIME'] = str(elem.get('LOCALTIME'))
gmond_parsed_tree['URL'] = str(elem.get('URL'))
host_list = []
for helem in elem.iterchildren():
host = {}
host['NAME'] = str(helem.get('NAME'))
host['IP'] = str(helem.get('IP'))
host['REPORTED'] = str(helem.get('REPORTED'))
host['TN'] = str(helem.get('TN'))
host['TMAX'] = str(helem.get('TMAX'))
host['DMAX'] = str(helem.get('DMAX'))
host['LOCATION'] = str(helem.get('LOCATION'))
host['GMOND_STARTED'] = str(helem.get('GMOND_STARTED'))
mlist = []
for metric in helem.iterchildren():
mdic = {}
mdic['NAME'] = str(metric.get('NAME'))
mdic['VAL'] = str(metric.get('VAL'))
mlist.append(mdic)
host['metrics'] = mlist
host_list.append(host)
gmond_parsed_tree['hosts'] = host_list
stat_dt = datetime.datetime.now()
gmond_parsed_tree['dt'] = stat_dt
self.gmond_parsed_tree_list.append(gmond_parsed_tree)
def retrieve_stats_raw(self):
'''
Retrieve stats from the gmond process.
'''
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.settimeout(10)
try:
soc.connect((self.gmond_svr_ip, self.gmond_port))
except socket.error as exp:
print "Connection failure host: %s [%s]" % (self.gmond_svr_ip, exp)
return None
data = ""
while True:
try:
rbytes = soc.recv(4096)
except socket.error as exp:
print "Read failed for host: ", str(exp)
return None
if len(rbytes) == 0:
break
data += rbytes
soc.close()
return data
class Monitor(object):
gmond_svr_ip = None
gmond_port = None
gmond_parsed_tree = {}
def __init__(self, gmond_svr, gmond_port=8649):
'''
The constructor simply sets the values of the gmond server and port.
'''
self.gmond_svr_ip = gmond_svr
self.gmond_port = gmond_port
# List of all stats.
self.gmond_parsed_tree_list = []
# series for all cpu loads
self.cpu_res = {}
self.mon_thread = None
def start_monitoring_thread(self, freq=10, count=10):
'''
Start the monitoring thread.
'''
self.mon_thread = MonitorExecutor(self.gmond_svr_ip,
self.gmond_port, freq, count)
self.mon_thread.start()
def stop_monitoring_thread(self):
self.mon_thread.set_force_stop()
self.gmond_parsed_tree_list = self.mon_thread.gmond_parsed_tree_list
def strip_raw_telnet_output(self, raw_data):
'''
When using the retrieve_stats_raw_telent api, the raw data
has some additional text along with the xml data. We need to
strip that before we can invoke pass it through the lxml parser.
'''
data = ""
xml_flag = False
for line in raw_data.splitlines():
if re.match(r".*<?xml version.*", line):
xml_flag = True
if xml_flag:
data += line + "\n"
return data
def retrieve_stats_raw_telnet(self):
'''
This way of retrieval is to create a subprocess and execute
the telnet command on the port to retrieve the xml raw data.
'''
cmd = "telnet " + self.gmond_svr_ip + " " + str(self.gmond_port)
print "cmd: ", cmd
port = str(self.gmond_port)
proc = subprocess.Popen(["telnet", self.gmond_svr_ip, port],
stdout=subprocess.PIPE)
(output, _) = proc.communicate()
newout = self.strip_raw_telnet_output(output)
return newout
def get_host_list(self, gmond_parsed_tree):
'''
Function returns all the hosts {} as a list.
'''
return gmond_parsed_tree['hosts']
def get_metric_value(self, parsed_node, host_name, name):
'''
The function returns the value of a specific metric, given
the host name and the metric name to collect.
'''
for host in parsed_node['hosts']:
if host['NAME'] == host_name:
for metric in host['metrics']:
if metric['NAME'] == name:
return metric['VAL']
return 0
def get_aggregate_cpu_usage(self, parsed_node, host_name):
'''
The function returns the aggregate CPU usage for a specific host.
eqation: [user cpu + system cpu * no of cpu /100]
'''
cpu_user = float(self.get_metric_value(parsed_node, host_name, "cpu_user"))
cpu_system = float(self.get_metric_value(parsed_node, host_name, "cpu_system"))
cpu_num = int(self.get_metric_value(parsed_node, host_name, "cpu_num"))
return (cpu_user + cpu_system) * cpu_num / 100
def build_cpu_metrics(self):
'''Add a new set of cpu metrics to the results dictionary self.cpu_res
The result dest dictionary should look like this:
key = host IP, value = list of cpu load where the
the first value is the baseline value followed by 1 or more
values collected during the test
{
'10.0.0.1': [ 0.03, 1.23, 1.20 ],
'10.0.0.2': [ 0.10, 1.98, 2.72 ]
}
After another xml is decoded:
{
'10.0.0.1': [ 0.03, 1.23, 1.20, 1.41 ],
'10.0.0.2': [ 0.10, 1.98, 2.72, 2.04 ]
}
Each value in the list is the cpu load calculated as
(cpu_user + cpu_system) * num_cpu / 100
The load_five metric cannot be used as it is the average for last 5'
'''
cpu_res = {}
for parsed_node in self.gmond_parsed_tree_list:
for host in self.get_host_list(parsed_node):
host_ip = host['IP']
cpu_num = 0
cpu_user = 0.0
cpu_system = 0.0
cpu_user = float(self.get_metric_value(parsed_node, host['NAME'], "cpu_user"))
cpu_system = float(self.get_metric_value(parsed_node, host['NAME'], "cpu_system"))
cpu_num = int(self.get_metric_value(parsed_node, host['NAME'], "cpu_num"))
cpu_load = round(((cpu_user + cpu_system) * cpu_num) / 100, 2)
try:
cpu_res[host_ip].append(cpu_load)
except KeyError:
cpu_res[host_ip] = [cpu_load]
return cpu_res
def get_formatted_datetime(self, parsed_node):
'''
Returns the data in formated string. This is the
time when the last stat was collected.
'''
now = parsed_node['dt']
fmt_dt = "[" + str(now.hour) + ":" + str(now.minute) + \
":" + str(now.second) + "]"
return fmt_dt
def get_formatted_host_row(self, host_list):
'''
Returns the hosts in formated order (for printing purposes)
'''
row_str = "".ljust(10)
for host in host_list:
row_str += host['NAME'].ljust(15)
return row_str
def get_formatted_metric_row(self, parsed_node, metric, justval):
'''
Returns a specific metric for all hosts in the same row
in formated string (for printing)
'''
host_list = self.get_host_list(parsed_node)
row_str = metric.ljust(len(metric) + 2)
for host in host_list:
val = self.get_metric_value(parsed_node, host['NAME'], metric)
row_str += str(val).ljust(justval)
return row_str
def dump_cpu_stats(self):
'''
Print the CPU stats
'''
hl_len = 80
print "-" * hl_len
print "CPU Statistics: ",
for parsed_node in self.gmond_parsed_tree_list:
hosts = self.get_host_list(parsed_node)
print self.get_formatted_datetime(parsed_node)
print self.get_formatted_host_row(hosts)
print "-" * hl_len
print self.get_formatted_metric_row(parsed_node, "cpu_user", 18)
print self.get_formatted_metric_row(parsed_node, "cpu_system", 18)
print "Aggregate ",
for host in hosts:
print str(self.get_aggregate_cpu_usage(parsed_node,
host['NAME'])).ljust(16),
print "\n"
def dump_gmond_parsed_tree(self):
'''
Display the full tree parsed from the gmond server stats.
'''
hl_len = 60
for parsed_node in self.gmond_parsed_tree_list:
print "%-20s (%s) URL: %s " % \
(parsed_node['CLUSTER-NAME'],
parsed_node['LOCALTIME'],
parsed_node['URL'])
print "-" * hl_len
row_str = " ".ljust(9)
for host in parsed_node['hosts']:
row_str += host['NAME'].ljust(15)
row_str += "\n"
print row_str
print "-" * hl_len
metric_count = len(parsed_node['hosts'][0]['metrics'])
for count in range(0, metric_count):
row_str = ""
host = parsed_node['hosts'][0]
row_str += parsed_node['hosts'][0]['metrics'][count]['NAME'].ljust(18)
for host in parsed_node['hosts']:
val = str(self.get_metric_value(parsed_node, host['NAME'],
host['metrics'][count]['NAME']))
row_str += val.ljust(12)
row_str += str(parsed_node['hosts'][0]).ljust(5)
print row_str
##################################################
# Only invoke the module directly for test purposes. Should be
# invoked from pns script.
##################################################
def main():
print "main: monitor"
gmon = Monitor("172.22.191.151", 8649)
gmon.start_monitoring_thread(freq=5, count=20)
print "wait for 15 seconds"
time.sleep(20)
print "Now force the thread to stop"
gmon.stop_monitoring_thread()
gmon.dump_cpu_stats()
cpu_metric = gmon.build_cpu_metrics()
print "cpu_metric: ", cpu_metric
if __name__ == "__main__":
main()