#!/usr/bin/env python # Copyright 2014 Cisco Systems, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # ''' Module for parsing statistical output from Ganglia (gmond) server The module opens a socket connection to collect statistical data. It parses the raw data in xml format. The data from ganglia/gmond is in a heirarchical xml format as below: : : ## Usage: Using the module is simple. 1. instantiate the Monitor with the gmond server ip and port to poll. gmon = Monitor("172.22.191.151", 8649) 2. Start the monitoring thread gmon.start_monitoring_thread(frequency, count) < run tests/tasks> gmon.stop_monitoring_thread() 3. Collecting stats: cpu_metric = gmon.build_cpu_metric() Returns a dictionary object with all the cpu stats for each node ''' import datetime import re import socket import subprocess from threading import Thread import time from lxml import etree class MonitorExecutor(Thread): ''' Thread handler class to asynchronously collect stats ''' THREAD_STOPPED = 0 THREAD_RUNNING = 1 def __init__(self, gmond_svr, gmond_port, freq=5, count=5): super(MonitorExecutor, self).__init__() self.gmond_svr_ip = gmond_svr self.gmond_port = gmond_port self.freq = freq self.count = count self.force_stop = False self.thread_status = MonitorExecutor.THREAD_STOPPED # This dictionary always holds the latest metric. self.gmond_parsed_tree_list = [] def run(self): ''' The thread runnable method. The function will periodically poll the gmond server and collect the metrics. ''' self.thread_status = MonitorExecutor.THREAD_RUNNING count = self.count while count > 0: if self.force_stop: self.thread_status = MonitorExecutor.THREAD_STOPPED return self.parse_gmond_xml_data() count -= 1 time.sleep(self.freq) self.thread_status = MonitorExecutor.THREAD_STOPPED def set_force_stop(self): ''' Setting the force stop flag to stop the thread. By default the thread stops after the specific count/iterations is reached ''' self.force_stop = True def parse_gmond_xml_data(self): ''' Parse gmond data (V2) Retrieve the ganglia stats from the aggregation node :return: None in case of error or a dictionary containing the stats ''' gmond_parsed_tree = {} raw_data = self.retrieve_stats_raw() if raw_data is None or len(raw_data) == 0: print "Failed to retrieve stats from server" return xtree = etree.XML(raw_data) ############################################ # Populate cluster information. ############################################ for elem in xtree.iter('CLUSTER'): gmond_parsed_tree['CLUSTER-NAME'] = str(elem.get('NAME')) gmond_parsed_tree['LOCALTIME'] = str(elem.get('LOCALTIME')) gmond_parsed_tree['URL'] = str(elem.get('URL')) host_list = [] for helem in elem.iterchildren(): host = {} host['NAME'] = str(helem.get('NAME')) host['IP'] = str(helem.get('IP')) host['REPORTED'] = str(helem.get('REPORTED')) host['TN'] = str(helem.get('TN')) host['TMAX'] = str(helem.get('TMAX')) host['DMAX'] = str(helem.get('DMAX')) host['LOCATION'] = str(helem.get('LOCATION')) host['GMOND_STARTED'] = str(helem.get('GMOND_STARTED')) mlist = [] for metric in helem.iterchildren(): mdic = {} mdic['NAME'] = str(metric.get('NAME')) mdic['VAL'] = str(metric.get('VAL')) mlist.append(mdic) host['metrics'] = mlist host_list.append(host) gmond_parsed_tree['hosts'] = host_list stat_dt = datetime.datetime.now() gmond_parsed_tree['dt'] = stat_dt self.gmond_parsed_tree_list.append(gmond_parsed_tree) def retrieve_stats_raw(self): ''' Retrieve stats from the gmond process. ''' soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) soc.settimeout(10) try: soc.connect((self.gmond_svr_ip, self.gmond_port)) except socket.error as exp: print "Connection failure host: %s [%s]" % (self.gmond_svr_ip, exp) return None data = "" while True: try: rbytes = soc.recv(4096) except socket.error as exp: print "Read failed for host: ", str(exp) return None if len(rbytes) == 0: break data += rbytes soc.close() return data class Monitor(object): gmond_svr_ip = None gmond_port = None gmond_parsed_tree = {} def __init__(self, gmond_svr, gmond_port=8649): ''' The constructor simply sets the values of the gmond server and port. ''' self.gmond_svr_ip = gmond_svr self.gmond_port = gmond_port # List of all stats. self.gmond_parsed_tree_list = [] # series for all cpu loads self.cpu_res = {} self.mon_thread = None def start_monitoring_thread(self, freq=10, count=10): ''' Start the monitoring thread. ''' self.mon_thread = MonitorExecutor(self.gmond_svr_ip, self.gmond_port, freq, count) self.mon_thread.start() def stop_monitoring_thread(self): self.mon_thread.set_force_stop() self.gmond_parsed_tree_list = self.mon_thread.gmond_parsed_tree_list def strip_raw_telnet_output(self, raw_data): ''' When using the retrieve_stats_raw_telent api, the raw data has some additional text along with the xml data. We need to strip that before we can invoke pass it through the lxml parser. ''' data = "" xml_flag = False for line in raw_data.splitlines(): if re.match(r".*