#!/usr/bin/python3 """ Copyright (c) 2017 Wind River Systems, Inc. SPDX-License-Identifier: Apache-2.0 """ import os import sys import time import datetime import psutil import fcntl import logging from six.moves import configparser import itertools import six from multiprocessing import Process from multiprocessing import cpu_count from subprocess import Popen from subprocess import PIPE from collections import OrderedDict from six.moves import input def generateString(meas, tag_n, tag_v, field_n, field_v): """generates the required string for the areas where fields are not static""" base = "{},".format(meas) try: for i in range(len(tag_n)): if i == len(tag_n) - 1: # have space between tags and fields base += "'{}'='{}' ".format(tag_n[i], str(tag_v[i])) else: # separate with commas base += "'{}'='{}',".format(tag_n[i], str(tag_v[i])) for i in range(len(field_v)): if str(field_v[i]).replace(".", "").isdigit(): if i == len(field_v) - 1: base += "'{}'='{}'".format(field_n[i], str(field_v[i])) else: base += "'{}'='{}',".format(field_n[i], str(field_v[i])) return base except IndexError: return None def collectMemtop(influx_info, node, ci): """collects system memory information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("memtop data starting collection with a collection interval of {}s".format(ci["memtop"])) measurement = "memtop" tags = {"node": node} MiB = 1024.0 while True: try: fields = OrderedDict([("total", 0), ("used", 0), ("free", 0), ("cached", 0), ("buf", 0), ("slab", 0), ("cas", 0), ("clim", 0), ("dirty", 0), ("wback", 0), ("anon", 0), ("avail", 0)]) with open("/proc/meminfo", "r") as f: hps = 0 # for each line in /proc/meminfo, match with element in fields for line in f: line = line.strip("\n").split() if line[0].strip(":").startswith("MemTotal"): # convert to from kibibytes to mibibytes fields["total"] = float(line[1]) / MiB elif line[0].strip(":").startswith("MemFree"): fields["free"] = int(line[1]) / MiB elif line[0].strip(":").startswith("MemAvailable"): fields["avail"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Buffers"): fields["buf"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Cached"): fields["cached"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Slab"): fields["slab"] = float(line[1]) / MiB elif line[0].strip(":").startswith("CommitLimit"): fields["clim"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Committed_AS"): fields["cas"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Dirty"): fields["dirty"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Writeback"): fields["wback"] = float(line[1]) / MiB elif line[0].strip(":").endswith("(anon)"): fields["anon"] += float(line[1]) / MiB elif line[0].strip(":").endswith("Hugepagesize"): hps = float(line[1]) / MiB fields["used"] = fields["total"] - fields["avail"] f.close() # get platform specific memory info fields["platform_avail"] = 0 fields["platform_hfree"] = 0 for file in os.listdir("/sys/devices/system/node"): if file.startswith("node"): node_num = file.replace("node", "").strip("\n") avail = hfree = 0 with open("/sys/devices/system/node/{}/meminfo".format(file)) as f1: for line in f1: line = line.strip("\n").split() if line[2].strip(":").startswith("MemFree") or line[2].strip(":").startswith("FilePages") or line[2].strip(":").startswith("SReclaimable"): avail += float(line[3]) elif line[2].strip(":").startswith("HugePages_Free"): hfree = float(line[3]) * hps fields["{}:avail".format(node_num)] = avail / MiB fields["{}:hfree".format(node_num)] = hfree # get platform sum fields["platform_avail"] += avail / MiB fields["platform_hfree"] += hfree f1.close() s = generateString(measurement, list(tags.keys()), list(tags.values()), list(fields.keys()), list(fields.values())) if s is None: good_string = False else: good_string = True if good_string: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], s), shell=True) p.communicate() time.sleep(ci["memtop"]) except KeyboardInterrupt: break except Exception: logging.error("memtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectMemstats(influx_info, node, ci, services, syseng_services, openstack_services, exclude_list, skip_list, collect_all): """collects rss and vsz information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("memstats data starting collection with a collection interval of {}s".format(ci["memstats"])) measurement = "memstats" tags = {"node": node} ps_output = None influx_string = "" while True: try: fields = {} ps_output = Popen("exec ps -e -o rss,vsz,cmd", shell=True, stdout=PIPE) # create dictionary of dictionaries if collect_all is False: for svc in services: fields[svc] = {"rss": 0, "vsz": 0} fields["static_syseng"] = {"rss": 0, "vsz": 0} fields["live_syseng"] = {"rss": 0, "vsz": 0} fields["total"] = {"rss": 0, "vsz": 0} ps_output.stdout.readline() while True: # for each line in ps output, get rss and vsz info line = ps_output.stdout.readline().strip("\n").split() # if at end of output, send data if not line: break else: rss = float(line[0]) vsz = float(line[1]) # go through all command outputs for i in range(2, len(line)): # remove unwanted characters and borders from cmd name. Ex: /usr/bin/example.py -> example.py svc = line[i].replace("(", "").replace(")", "").strip(":").split("/")[-1].strip("\n") if svc == "gunicorn": gsvc = line[-1].replace("[", "").replace("]", "").strip("\n") if gsvc == "public:application": gsvc = "keystone-public" elif gsvc == "admin:application": gsvc = "keystone-admin" gsvc = "gunicorn_{}".format(gsvc) if gsvc not in fields: fields[gsvc] = {"rss": rss, "vsz": vsz} else: fields[gsvc]["rss"] += rss fields[gsvc]["vsz"] += vsz elif svc == "postgres": if (len(line) <= i + 2): # Command line could be "sudo su postgres", skip it break if line[i + 1].startswith("-") is False and line[i + 1].startswith("_") is False and line[i + 1] != "psql": psvc = "" if line[i + 2] in openstack_services: psvc = line[i + 2].strip("\n") else: for j in range(i + 1, len(line)): psvc += "{}_".format(line[j].strip("\n")) psvc = "postgres_{}".format(psvc).strip("_") if psvc not in fields: fields[psvc] = {"rss": rss, "vsz": vsz} else: fields[psvc]["rss"] += rss fields[psvc]["vsz"] += vsz if collect_all is False: if svc in services: fields[svc]["rss"] += rss fields[svc]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break elif svc in syseng_services: if svc == "live_stream.py": fields["live_syseng"]["rss"] += rss fields["live_syseng"]["vsz"] += vsz else: fields["static_syseng"]["rss"] += rss fields["static_syseng"]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break # Collect all services else: if svc in exclude_list or svc.startswith("-") or svc[0].isdigit() or svc.startswith("[") or svc.endswith("]"): continue elif svc in skip_list or svc.startswith("IPaddr"): break else: if svc not in fields: fields[svc] = {"rss": rss, "vsz": vsz} else: fields[svc]["rss"] += rss fields[svc]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break # send data to InfluxDB for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", key, "rss", fields[key]["rss"], "vsz", fields[key]["vsz"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" ps_output.kill() time.sleep(ci["memstats"]) except KeyboardInterrupt: if ps_output is not None: ps_output.kill() break except Exception: logging.error("memstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectSchedtop(influx_info, node, ci, services, syseng_services, openstack_services, exclude_list, skip_list, collect_all): """collects task cpu information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("schedtop data starting collection with a collection interval of {}s".format(ci["schedtop"])) measurement = "schedtop" tags = {"node": node} influx_string = "" top_output = Popen("exec top -b -c -w 512 -d{}".format(ci["schedtop"]), shell=True, stdout=PIPE) while True: try: fields = {} pro = psutil.Process(top_output.pid) # if process dies, restart it if pro.status() == "zombie": top_output.kill() top_output = Popen("exec top -b -c -w 512 -d{}".format(ci["schedtop"]), shell=True, stdout=PIPE) if collect_all is False: for svc in services: fields[svc] = 0 fields["static_syseng"] = 0 fields["live_syseng"] = 0 fields["total"] = 0 # check first line line = top_output.stdout.readline() if not line: pass else: # skip header completely for _ in range(6): top_output.stdout.readline() while True: line = top_output.stdout.readline().strip("\n").split() # if end of top output, leave this while loop if not line: break else: occ = float(line[8]) # for each command listed, check if it matches one from the list for i in range(11, len(line)): # remove unwanted characters and borders from cmd name. Ex: /usr/bin/example.py -> example.py svc = line[i].replace("(", "").replace(")", "").strip(":").split("/")[-1] if svc == "gunicorn": gsvc = line[-1].replace("[", "").replace("]", "").strip("\n") if gsvc == "public:application": gsvc = "keystone-public" elif gsvc == "admin:application": gsvc = "keystone-admin" gsvc = "gunicorn_{}".format(gsvc) if gsvc not in fields: fields[gsvc] = occ else: fields[gsvc] += occ elif svc == "postgres": if (len(line) <= i + 2): # Command line could be "sudo su postgres", skip it break if line[i + 1].startswith("-") is False and line[i + 1].startswith("_") is False and line[i + 1] != "psql": psvc = "" if line[i + 2] in openstack_services: psvc = line[i + 2].strip("\n") else: for j in range(i + 1, len(line)): psvc += "{}_".format(line[j].strip("\n")) psvc = "postgres_{}".format(psvc).strip("_") if psvc not in fields: fields[psvc] = occ else: fields[psvc] += occ if collect_all is False: if svc in services: fields[svc] += occ fields["total"] += occ break elif svc in syseng_services: if svc == "live_stream.py": fields["live_syseng"] += occ else: fields["static_syseng"] += occ fields["total"] += occ break # Collect all services else: if svc in exclude_list or svc.startswith("-") or svc[0].isdigit() or svc.startswith("[") or svc.endswith("]"): continue elif svc in skip_list or svc.startswith("IPaddr"): break else: if svc not in fields: fields[svc] = occ else: fields[svc] += occ fields["total"] += occ break for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", key, "occ", fields[key]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["schedtop"]) except KeyboardInterrupt: if top_output is not None: top_output.kill() break except Exception: logging.error("schedtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectDiskstats(influx_info, node, ci): """collects disk utilization information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("diskstats data starting collection with a collection interval of {}s".format(ci["diskstats"])) measurement = "diskstats" tags = {"node": node, "file_system": None, "type": None, "mount": None} fields = {"size": 0, "used": 0, "avail": 0, "usage": 0} influx_string = "" while True: try: parts = psutil.disk_partitions() for i in parts: # gather all partitions tags["mount"] = str(i[1]).split("/")[-1] # if mount == '', call it root if tags["mount"] == "": tags["mount"] = "root" # skip boot elif tags["mount"] == "boot": continue tags["file_system"] = str(i[0]).split("/")[-1] tags["type"] = i[2] u = psutil.disk_usage(i[1]) fields["size"] = u[0] fields["used"] = u[1] fields["avail"] = u[2] fields["usage"] = u[3] influx_string += "{},'{}'='{}','{}'='{}','{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "file_system", tags["file_system"], "type", tags["type"], "mount", tags["mount"], "size", fields["size"], "used", fields["used"], "avail", fields["avail"], "usage", fields["usage"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["diskstats"]) except KeyboardInterrupt: break except Exception: logging.error("diskstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectIostat(influx_info, node, ci): """collect device I/O information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("iostat data starting collection with a collection interval of {}s".format(ci["iostat"])) measurement = "iostat" tags = {"node": node} sector_size = 512.0 influx_string = "" while True: try: fields = {} tmp = {} tmp1 = {} start = time.time() # get initial values for dev in os.listdir("/sys/block/"): if dev.startswith("sr"): continue else: fields[dev] = {"r/s": 0, "w/s": 0, "io/s": 0, "rkB/s": 0, "wkB/s": 0, "rrqms/s": 0, "wrqms/s": 0, "util": 0} tmp[dev] = {"init_reads": 0, "init_reads_merged": 0, "init_read_sectors": 0, "init_read_wait": 0, "init_writes": 0, "init_writes_merged": 0, "init_write_sectors": 0, "init_write_wait": 0, "init_io_progress": 0, "init_io_time": 0, "init_wait_time": 0} with open("/sys/block/{}/stat".format(dev), "r") as f: # get initial readings line = f.readline().strip("\n").split() tmp[dev]["init_reads"] = int(line[0]) tmp[dev]["init_reads_merged"] = int(line[1]) tmp[dev]["init_read_sectors"] = int(line[2]) tmp[dev]["init_read_wait"] = int(line[3]) tmp[dev]["init_writes"] = int(line[4]) tmp[dev]["init_writes_merged"] = int(line[5]) tmp[dev]["init_write_sectors"] = int(line[6]) tmp[dev]["init_write_wait"] = int(line[7]) tmp[dev]["init_io_progress"] = int(line[8]) tmp[dev]["init_io_time"] = int(line[9]) tmp[dev]["init_wait_time"] = int(line[10]) time.sleep(ci["iostat"]) dt = time.time() - start # get values again for dev in os.listdir("/sys/block/"): if dev.startswith("sr"): continue else: # during a swact, some devices may not have been read in the initial reading. If found now, add them to dict if dev not in fields: fields[dev] = {"r/s": 0, "w/s": 0, "io/s": 0, "rkB/s": 0, "wkB/s": 0, "rrqms/s": 0, "wrqms/s": 0, "util": 0} tmp1[dev] = {"reads": 0, "reads_merged": 0, "read_sectors": 0, "read_wait": 0, "writes": 0, "writes_merged": 0, "write_sectors": 0, "write_wait": 0, "io_progress": 0, "io_time": 0, "wait_time": 0} with open("/sys/block/{}/stat".format(dev), "r") as f: line = f.readline().strip("\n").split() tmp1[dev]["reads"] = int(line[0]) tmp1[dev]["reads_merged"] = int(line[1]) tmp1[dev]["read_sectors"] = int(line[2]) tmp1[dev]["read_wait"] = int(line[3]) tmp1[dev]["writes"] = int(line[4]) tmp1[dev]["writes_merged"] = int(line[5]) tmp1[dev]["write_sectors"] = int(line[6]) tmp1[dev]["write_wait"] = int(line[7]) tmp1[dev]["io_progress"] = int(line[8]) tmp1[dev]["io_time"] = int(line[9]) tmp1[dev]["wait_time"] = int(line[10]) # take difference and divide by delta t for key in fields: # if device was found in initial and second reading, do calculation if key in tmp and key in tmp1: fields[key]["r/s"] = abs(tmp1[key]["reads"] - tmp[key]["init_reads"]) / dt fields[key]["w/s"] = abs(tmp1[key]["writes"] - tmp[key]["init_writes"]) / dt fields[key]["rkB/s"] = abs(tmp1[key]["read_sectors"] - tmp[key]["init_read_sectors"]) * sector_size / dt / 1000 fields[key]["wkB/s"] = abs(tmp1[key]["write_sectors"] - tmp[key]["init_write_sectors"]) * sector_size / dt / 1000 fields[key]["rrqms/s"] = abs(tmp1[key]["reads_merged"] - tmp[key]["init_reads_merged"]) / dt fields[key]["wrqms/s"] = abs(tmp1[key]["writes_merged"] - tmp[key]["init_writes_merged"]) / dt fields[key]["io/s"] = fields[key]["r/s"] + fields[key]["w/s"] + fields[key]["rrqms/s"] + fields[key]["wrqms/s"] fields[key]["util"] = abs(tmp1[key]["io_time"] - tmp[key]["init_io_time"]) / dt / 10 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "device", key, "r/s", fields[key]["r/s"], "w/s", fields[key]["w/s"], "rkB/s", fields[key]["rkB/s"], "wkB/s", fields[key]["wkB/s"], "rrqms/s", fields[key]["rrqms/s"], "wrqms/s", fields[key]["wrqms/s"], "io/s", fields[key]["io/s"], "util", fields[key]["util"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("iostat collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectLoadavg(influx_info, node, ci): """collects cpu load average information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("load_avg data starting collection with a collection interval of {}s".format(ci["load_avg"])) measurement = "load_avg" tags = {"node": node} fields = {"load_avg": 0} while True: try: fields["load_avg"] = os.getloadavg()[0] p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{},'{}'='{}' '{}'='{}''".format(influx_info[0], influx_info[1], influx_info[2], measurement, "node", tags["node"], "load_avg", fields["load_avg"]), shell=True) p.communicate() time.sleep(ci["load_avg"]) except KeyboardInterrupt: break except Exception: logging.error("load_avg collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectOcctop(influx_info, node, ci, pc): """collects cpu utilization information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("occtop data starting collection with a collection interval of {}s".format(ci["occtop"])) measurement = "occtop" tags = {"node": node} platform_cores = pc influx_string = "" while True: try: cpu = psutil.cpu_percent(percpu=True) cpu_times = psutil.cpu_times_percent(percpu=True) fields = {} # sum all cpu percents total = float(sum(cpu)) sys_total = 0 fields["platform_total"] = {"usage": 0, "system": 0} cores = 0 # for each core, get values and assign a tag for el in cpu: fields["usage"] = float(el) fields["system"] = float(cpu_times[cores][2]) sys_total += float(cpu_times[cores][2]) tags["core"] = "core_{}".format(cores) influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", tags["core"], "usage", fields["usage"], "system", fields["system"]) + "\n" if len(platform_cores) > 0: if cores in platform_cores: fields["platform_total"]["usage"] += float(el) fields["platform_total"]["system"] += float(cpu_times[cores][2]) cores += 1 # add usage and system total to influx string if len(platform_cores) > 0: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", "platform_total", "usage", fields["platform_total"]["usage"], "system", fields["platform_total"]["system"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", "total", "usage", total, "system", sys_total) + "\n" # send data to Influx p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["occtop"]) except KeyboardInterrupt: break except Exception: logging.error("occtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectNetstats(influx_info, node, ci): """collects network interface information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("netstats data starting collection with a collection interval of {}s".format(ci["netstats"])) measurement = "netstats" tags = {"node": node} fields = {} prev_fields = {} Mbps = float(1000000 / 8) influx_string = "" while True: try: net = psutil.net_io_counters(pernic=True) # get initial data for difference calculation for key in net: prev_fields[key] = {"tx_B": net[key][0], "rx_B": net[key][1], "tx_p": net[key][2], "rx_p": net[key][3]} start = time.time() time.sleep(ci["netstats"]) net = psutil.net_io_counters(pernic=True) # get new data for difference calculation dt = time.time() - start for key in net: tx_B = (float(net[key][0]) - float(prev_fields[key]["tx_B"])) tx_Mbps = tx_B / Mbps / dt rx_B = (float(net[key][1]) - float(prev_fields[key]["rx_B"])) rx_Mbps = rx_B / Mbps / dt tx_pps = (float(net[key][2]) - float(prev_fields[key]["tx_p"])) / dt rx_pps = (float(net[key][3]) - float(prev_fields[key]["rx_p"])) / dt # ensure no division by zero if rx_B > 0 and rx_pps > 0: rx_packet_size = rx_B / rx_pps else: rx_packet_size = 0 if tx_B > 0 and tx_pps > 0: tx_packet_size = tx_B / tx_pps else: tx_packet_size = 0 fields[key] = {"tx_mbps": tx_Mbps, "rx_mbps": rx_Mbps, "tx_pps": tx_pps, "rx_pps": rx_pps, "tx_packet_size": tx_packet_size, "rx_packet_size": rx_packet_size} for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "interface", key, "rx_mbps", fields[key]["rx_mbps"], "tx_mbps", fields[key]["tx_mbps"], "rx_pps", fields[key]["rx_pps"], "tx_pps", fields[key]["tx_pps"], "rx_packet_size", fields[key]["rx_packet_size"], "tx_packet_size", fields[key]["tx_packet_size"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("netstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectPostgres(influx_info, node, ci): """collects postgres db size and postgres service size information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("postgres data starting collection with a collection interval of {}s".format(ci["postgres"])) measurement = "postgres_db_size" measurement1 = "postgres_svc_stats" tags = {"node": node, "service": None, "table_schema": 0, "table": None} fields = {"db_size": 0, "connections": 0} fields1 = {"table_size": 0, "total_size": 0, "index_size": 0, "live_tuples": 0, "dead_tuples": 0} postgres_output = postgres_output1 = None influx_string = influx_string1 = "" good_string = False dbcount = 0 BATCH_SIZE = 10 while True: try: # make sure this is active controller, otherwise postgres queries wont work if isActiveController(): while True: postgres_output = Popen("sudo -u postgres psql --pset pager=off -q -t -c'SELECT datname, pg_database_size(datname) FROM pg_database WHERE datistemplate = false;'", shell=True, stdout=PIPE) db_lines = postgres_output.stdout.read().replace(" ", "").strip().split("\n") if db_lines == "" or db_lines is None: postgres_output.kill() break else: # for each database from the previous output for line in db_lines: if not line: break line = line.replace(" ", "").split("|") tags["service"] = line[0] fields["db_size"] = line[1] # send DB size to InfluxDB influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "db_size", fields["db_size"]) + "\n" # get tables for each database sql = "SELECT table_schema,table_name,pg_size_pretty(table_size) AS table_size,pg_size_pretty(indexes_size) AS indexes_size,pg_size_pretty(total_size) AS total_size,live_tuples,dead_tuples FROM (SELECT table_schema,table_name,pg_table_size(table_name) AS table_size,pg_indexes_size(table_name) AS indexes_size,pg_total_relation_size(table_name) AS total_size,pg_stat_get_live_tuples(table_name::regclass) AS live_tuples,pg_stat_get_dead_tuples(table_name::regclass) AS dead_tuples FROM (SELECT table_schema,table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE') AS all_tables ORDER BY total_size DESC) AS pretty_sizes;" postgres_output1 = Popen('sudo -u postgres psql --pset pager=off -q -t -d{} -c"{}"'.format(line[0], sql), shell=True, stdout=PIPE) tbl_lines = postgres_output1.stdout.read().replace(" ", "").strip().split("\n") for line in tbl_lines: if line == "": continue else: line = line.replace(" ", "").split("|") elements = list() # ensures all data is present if len(line) != 7: good_string = False break else: # do some conversions for el in line: if el.endswith("bytes"): el = int(el.replace("bytes", "")) elif el.endswith("kB"): el = el.replace("kB", "") el = int(el) * 1000 elif el.endswith("MB"): el = el.replace("MB", "") el = int(el) * 1000000 elif el.endswith("GB"): el = el.replace("GB", "") el = int(el) * 1000000000 elements.append(el) tags["table_schema"] = elements[0] tags["table"] = elements[1] fields1["table_size"] = int(elements[2]) fields1["index_size"] = int(elements[3]) fields1["total_size"] = int(elements[4]) fields1["live_tuples"] = int(elements[5]) fields1["dead_tuples"] = int(elements[6]) influx_string1 += "{},'{}'='{}','{}'='{}','{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement1, "node", tags["node"], "service", tags["service"], "table_schema", tags["table_schema"], "table", tags["table"], "table_size", fields1["table_size"], "index_size", fields1["index_size"], "total_size", fields1["total_size"], "live_tuples", fields1["live_tuples"], "dead_tuples", fields1["dead_tuples"]) + "\n" good_string = True dbcount += 1 if dbcount == BATCH_SIZE and good_string: # Curl will barf if the batch is too large p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string1), shell=True) p.communicate() influx_string1 = "" dbcount = 0 if good_string: # send table data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string1), shell=True) p.communicate() influx_string = influx_string1 = "" dbcount = 0 time.sleep(ci["postgres"]) postgres_output1.kill() postgres_output.kill() else: time.sleep(20) except KeyboardInterrupt: if postgres_output is not None: postgres_output.kill() if postgres_output1 is not None: postgres_output1.kill() break except Exception: logging.error("postgres collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectPostgresConnections(influx_info, node, ci, fast): """collect postgres connections information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) if fast: logging.info("postgres_connections data starting collection with a constant collection interval") else: logging.info("postgres_connections data starting collection with a collection interval of {}s".format(ci["postgres"])) measurement = "postgres_connections" tags = {"node": node, "service": None, "state": None} connections_output = None influx_string = "" while True: try: # make sure this is active controller, otherwise postgres queries wont work if isActiveController(): while True: fields = {} # outputs a list of postgres dbs and their connections connections_output = Popen("sudo -u postgres psql --pset pager=off -q -c 'SELECT datname,state,count(*) from pg_stat_activity group by datname,state;'", shell=True, stdout=PIPE) line = connections_output.stdout.readline() if line == "" or line is None: break # skip header connections_output.stdout.readline() while True: line = connections_output.stdout.readline().strip("\n") if not line: break else: line = line.replace(" ", "").split("|") if len(line) != 3: continue else: svc = line[0] connections = int(line[2]) tags["service"] = svc if svc not in fields: fields[svc] = {"active": 0, "idle": 0, "other": 0} if line[1] == "active": fields[svc]["active"] = connections elif line[1] == "idle": fields[svc]["idle"] = connections else: fields[svc]["other"] = connections influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "active", "connections", fields[svc]["active"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "idle", "connections", fields[svc]["idle"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "other", "connections", fields[svc]["other"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" connections_output.kill() if fast: pass else: time.sleep(ci["postgres"]) else: time.sleep(20) except KeyboardInterrupt: if connections_output is not None: connections_output.kill() break except Exception: logging.error("postgres_connections collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectRabbitMq(influx_info, node, ci): """collects rabbitmq information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("rabbitmq data starting collection with a collection interval of {}s".format(ci["rabbitmq"])) measurement = "rabbitmq" tags = OrderedDict([("node", node)]) rabbitmq_output = None while True: try: # make sure this is active controller, otherwise rabbit queries wont work if isActiveController(): while True: fields = OrderedDict([]) rabbitmq_output = Popen("sudo rabbitmqctl -n rabbit@localhost status", shell=True, stdout=PIPE) # needed data starts where output = '{memory,[' line = rabbitmq_output.stdout.readline() # if no data is returned, exit if line == "" or line is None: rabbitmq_output.kill() break else: line = rabbitmq_output.stdout.read().strip("\n").split("{memory,[") if len(line) != 2: rabbitmq_output.kill() break else: # remove brackets from data info = line[1].replace(" ", "").replace("{", "").replace("}", "").replace("\n", "").replace("[", "").replace("]", "").split(",") for i in range(len(info) - 3): if info[i].endswith("total"): info[i] = info[i].replace("total", "memory_total") # some data needs string manipulation if info[i].startswith("clustering") or info[i].startswith("amqp"): info[i] = "listeners_" + info[i] if info[i].startswith("total_"): info[i] = "descriptors_" + info[i] if info[i].startswith("limit") or info[i].startswith("used"): info[i] = "processes_" + info[i] if info[i].replace("_", "").isalpha() and info[i + 1].isdigit(): fields[info[i]] = info[i + 1] s = generateString(measurement, list(tags.keys()), list(tags.values()), list(fields.keys()), list(fields.values())) if s is None: rabbitmq_output.kill() else: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], s), shell=True) p.communicate() time.sleep(ci["rabbitmq"]) rabbitmq_output.kill() else: time.sleep(20) except KeyboardInterrupt: if rabbitmq_output is not None: rabbitmq_output.kill() break except Exception: logging.error("rabbitmq collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectRabbitMqSvc(influx_info, node, ci, services): """collects rabbitmq messaging information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("rabbitmq_svc data starting collection with a collection interval of {}s".format(ci["rabbitmq"])) measurement = "rabbitmq_svc" tags = {"node": node, "service": None} fields = {"messages": 0, "messages_ready": 0, "messages_unacknowledged": 0, "memory": 0, "consumers": 0} rabbitmq_svc_output = None good_string = False influx_string = "" while True: try: # make sure this is active controller, otherwise rabbit queries wont work if isActiveController(): while True: rabbitmq_svc_output = Popen("sudo rabbitmqctl -n rabbit@localhost list_queues name messages messages_ready messages_unacknowledged memory consumers", shell=True, stdout=PIPE) # # if no data is returned, exit if rabbitmq_svc_output.stdout.readline() == "" or rabbitmq_svc_output.stdout.readline() is None: rabbitmq_svc_output.kill() break else: for line in rabbitmq_svc_output.stdout: line = line.split() if not line: break else: if len(line) != 6: good_string = False break else: # read line and fill fields if line[0] in services: tags["service"] = line[0] fields["messages"] = line[1] fields["messages_ready"] = line[2] fields["messages_unacknowledged"] = line[3] fields["memory"] = line[4] fields["consumers"] = line[5] influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "messages", fields["messages"], "messages_ready", fields["messages_ready"], "messages_unacknowledged", fields["messages_unacknowledged"], "memory", fields["memory"], "consumers", fields["consumers"]) + "\n" good_string = True if good_string: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["rabbitmq"]) rabbitmq_svc_output.kill() else: time.sleep(20) except KeyboardInterrupt: if rabbitmq_svc_output is not None: rabbitmq_svc_output.kill() break except Exception: logging.error("rabbitmq_svc collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectFilestats(influx_info, node, ci, services, syseng_services, exclude_list, skip_list, collect_all): """collects open file information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("filestats data starting collection with a collection interval of {}s".format(ci["filestats"])) measurement = "filestats" tags = {"node": node} influx_string = "" while True: try: fields = {} # fill dict with services from engtools.conf if collect_all is False: for svc in services: fields[svc] = {"read/write": 0, "write": 0, "read": 0} fields["static_syseng"] = {"read/write": 0, "write": 0, "read": 0} fields["live_syseng"] = {"read/write": 0, "write": 0, "read": 0} fields["total"] = {"read/write": 0, "write": 0, "read": 0} for process in os.listdir("/proc/"): if process.isdigit(): # sometimes the process dies before reading its info try: svc = psutil.Process(int(process)).name() svc = svc.split()[0].replace("(", "").replace(")", "").strip(":").split("/")[-1] except Exception: continue if collect_all is False: if svc in services: try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if priv[1] == "r" and priv[2] == "w": fields[svc]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields[svc]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields[svc]["write"] += 1 fields["total"]["write"] += 1 except Exception: p.kill() continue p.kill() elif svc in syseng_services: try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if svc == "live_stream.py": if priv[1] == "r" and priv[2] == "w": fields["live_syseng"]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields["live_syseng"]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields["live_syseng"]["write"] += 1 fields["total"]["write"] += 1 else: if priv[1] == "r" and priv[2] == "w": fields["static_syseng"]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields["static_syseng"]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields["static_syseng"]["write"] += 1 fields["total"]["write"] += 1 except Exception: p.kill() continue p.kill() else: # remove garbage processes if svc in exclude_list or svc in skip_list or svc.startswith("-") or svc.endswith("-") or svc[0].isdigit() or svc[-1].isdigit() or svc[0].isupper(): continue elif svc not in fields: fields[svc] = {"read/write": 0, "write": 0, "read": 0} try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if priv[1] == "r" and priv[2] == "w": fields[svc]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields[svc]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields[svc]["write"] += 1 fields["total"]["write"] += 1 if fields[svc]["read/write"] == 0 and fields[svc]["read"] == 0 and fields[svc]["write"] == 0: del fields[svc] except Exception: p.kill() continue p.kill() for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", key, "read/write", fields[key]["read/write"], "write", fields[key]["write"], "read", fields[key]["read"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["filestats"]) except KeyboardInterrupt: break except Exception: logging.error("filestats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectVswitch(influx_info, node, ci): """collects vshell information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("vswitch data starting collection with a collection interval of {}s".format(ci["vswitch"])) measurement = "vswitch" tags = OrderedDict([("node", node), ("engine", 0)]) tags1 = OrderedDict([("node", node), ("port", 0)]) tags2 = OrderedDict([("node", node), ("interface", 0)]) fields = OrderedDict([("cpuid", 0), ("rx_packets", 0), ("tx_packets", 0), ("rx_discard", 0), ("tx_discard", 0), ("tx_disabled", 0), ("tx_overflow", 0), ("tx_timeout", 0), ("usage", 0)]) fields1 = OrderedDict([("rx_packets", 0), ("tx_packets", 0), ("rx_bytes", 0), ("tx_bytes", 0), ("tx_errors", 0), ("rx_errors", 0), ("rx_nombuf", 0)]) fields2 = OrderedDict([("rx_packets", 0), ("tx_packets", 0), ("rx_bytes", 0), ("tx_bytes", 0), ("tx_errors", 0), ("rx_errors", 0), ("tx_discards", 0), ("rx_discards", 0), ("rx_floods", 0), ("rx_no_vlan", 0)]) vshell_engine_stats_output = vshell_port_stats_output = vshell_interface_stats_output = None influx_string = "" while True: try: vshell_engine_stats_output = Popen("vshell engine-stats-list", shell=True, stdout=PIPE) # skip first few lines vshell_engine_stats_output.stdout.readline() vshell_engine_stats_output.stdout.readline() vshell_engine_stats_output.stdout.readline() while True: line = vshell_engine_stats_output.stdout.readline().replace("|", "").split() if not line: break # skip lines like +++++++++++++++++++++++++++++ elif line[0].startswith("+"): continue else: # get info from output i = 2 tags["engine"] = line[1] for key in fields: fields[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags.keys())[0], list(tags.values())[0], list(tags.keys())[1], list(tags.values())[1], list(fields.keys())[0], list(fields.values())[0], list(fields.keys())[1], list(fields.values())[1], list(fields.keys())[2], list(fields.values())[2], list(fields.keys())[3], list(fields.values())[3], list(fields.keys())[4], list(fields.values())[4], list(fields.keys())[5], list(fields.values())[5], list(fields.keys())[6], list(fields.values())[6], list(fields.keys())[7], list(fields.values())[7], list(fields.keys())[8], list(fields.values())[8]) + "\n" vshell_engine_stats_output.kill() vshell_port_stats_output = Popen("vshell port-stats-list", shell=True, stdout=PIPE) vshell_port_stats_output.stdout.readline() vshell_port_stats_output.stdout.readline() vshell_port_stats_output.stdout.readline() while True: line = vshell_port_stats_output.stdout.readline().replace("|", "").split() if not line: break elif line[0].startswith("+"): continue else: i = 3 tags1["port"] = line[1] for key in fields1: fields1[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags1.keys())[0], list(tags1.values())[0], list(tags1.keys())[1], list(tags1.values())[1], list(fields1.keys())[0], list(fields1.values())[0], list(fields1.keys())[1], list(fields1.values())[1], list(fields1.keys())[2], list(fields1.values())[2], list(fields1.keys())[3], list(fields1.values())[3], list(fields1.keys())[4], list(fields1.values())[4], list(fields1.keys())[5], list(fields1.values())[5], list(fields1.keys())[6], list(fields1.values())[6]) + "\n" vshell_port_stats_output.kill() vshell_interface_stats_output = Popen("vshell interface-stats-list", shell=True, stdout=PIPE) vshell_interface_stats_output.stdout.readline() vshell_interface_stats_output.stdout.readline() vshell_interface_stats_output.stdout.readline() while True: line = vshell_interface_stats_output.stdout.readline().replace("|", "").split() if not line: break elif line[0].startswith("+"): continue else: if line[2] == "ethernet" and line[3].startswith("eth"): i = 4 tags2["interface"] = line[3] for key in fields2: fields2[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags2.keys())[0], list(tags2.values())[0], list(tags2.keys())[1], list(tags2.values())[1], list(fields2.keys())[0], list(fields2.values())[0], list(fields2.keys())[1], list(fields2.values())[1], list(fields2.keys())[2], list(fields2.values())[2], list(fields2.keys())[3], list(fields2.values())[3], list(fields2.keys())[4], list(fields2.values())[4], list(fields2.keys())[5], list(fields2.values())[5], list(fields2.keys())[6], list(fields2.values())[6], list(fields2.keys())[7], list(fields2.values())[7], list(fields2.keys())[8], list(fields2.values())[8], list(fields2.keys())[9], list(fields2.values())[9]) + "\n" else: continue vshell_interface_stats_output.kill() # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["vswitch"]) except KeyboardInterrupt: if vshell_engine_stats_output is not None: vshell_engine_stats_output.kill() if vshell_port_stats_output is not None: vshell_port_stats_output.kill() if vshell_interface_stats_output is not None: vshell_interface_stats_output.kill() break except Exception: logging.error("vswitch collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectCpuCount(influx_info, node, ci): """collects the number of cores""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("cpu_count data starting collection with a collection interval of {}s".format(ci["cpu_count"])) measurement = "cpu_count" tags = {"node": node} while True: try: fields = {"cpu_count": cpu_count()} p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{},'{}'='{}' '{}'='{}''".format(influx_info[0], influx_info[1], influx_info[2], measurement, "node", tags["node"], "cpu_count", fields["cpu_count"]), shell=True) p.communicate() time.sleep(ci["cpu_count"]) except KeyboardInterrupt: break except Exception: logging.error("cpu_count collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) def collectApiStats(influx_info, node, ci, services, db_port, rabbit_port): logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("api_request data starting collection with a collection interval of {}s".format(ci["cpu_count"])) measurement = "api_requests" tags = {"node": node} influx_string = "" lsof_args = ['lsof', '-Pn', '-i', 'tcp'] while True: try: fields = {} lsof_result = Popen(lsof_args, shell=False, stdout=PIPE) lsof_lines = list() while True: line = lsof_result.stdout.readline().strip("\n") if not line: break lsof_lines.append(line) lsof_result.kill() for name, service in services.items(): pid_list = list() check_pid = False if name == "keystone-public": check_pid = True ps_result = Popen("pgrep -f --delimiter=' ' keystone-public", shell=True, stdout=PIPE) pid_list = ps_result.stdout.readline().strip().split(' ') ps_result.kill() elif name == "gnocchi-api": check_pid = True ps_result = Popen("pgrep -f --delimiter=' ' gnocchi-api", shell=True, stdout=PIPE) pid_list = ps_result.stdout.readline().strip().split(' ') ps_result.kill() api_count = 0 db_count = 0 rabbit_count = 0 for line in lsof_lines: if service['name'] is not None and service['name'] in line and (not check_pid or any(pid in line for pid in pid_list)): if service['api-port'] is not None and service['api-port'] in line: api_count += 1 elif db_port is not None and db_port in line: db_count += 1 elif rabbit_port is not None and rabbit_port in line: rabbit_count += 1 fields[name] = {"api": api_count, "db": db_count, "rabbit": rabbit_count} influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", name, "api", fields[name]["api"], "db", fields[name]["db"], "rabbit", fields[name]["rabbit"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("api_request collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def getPlatformCores(node, cpe): """returns the cores dedicated to platform use""" if cpe is True or node.startswith("compute"): logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) core_list = list() try: with open("/etc/platform/worker_reserved.conf", "r") as f: for line in f: if line.startswith("PLATFORM_CPU_LIST"): core_list = line.split("=")[1].replace("\"", "").strip("\n").split(",") core_list = [int(x) for x in core_list] return core_list except Exception: logging.warning("skipping platform specific collection for {} due to error: {}".format(node, sys.exc_info())) return core_list else: return [] def isActiveController(): """determine if controller is active/standby""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) try: p = Popen("sm-dump", shell=True, stdout=PIPE) p.stdout.readline() p.stdout.readline() # read line for active/standby line = p.stdout.readline().strip("\n").split() per = line[1] p.kill() if per == "active": return True else: return False except Exception: if p is not None: p.kill() logging.error("sm-dump command could not be called properly. This is usually caused by a swact. Trying again on next call: {}".format(sys.exc_info())) return False def checkDuration(duration): """checks whether the duration param has been set. If set, sleep; then kill processes upon waking up""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) if duration is None: return None else: time.sleep(duration) print("Duration interval has ended. Killing processes now") logging.warning("Duration interval has ended. Killing processes now") raise KeyboardInterrupt def killProcesses(tasks): """kill all processes and log each death""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) for t in tasks: try: logging.info("{} data stopped collection".format(str(t.name))) t.terminate() except Exception: continue def createDB(influx_info, grafana_port, grafana_api_key): """create database in InfluxDB and add it to Grafana""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) p = None try: logging.info("Adding database to InfluxDB and Grafana") # create database in InfluxDB if not already created. Will NOT overwrite previous db p = Popen("curl -s -XPOST 'http://'{}':'{}'/query' --data-urlencode 'q=CREATE DATABASE {}'".format(influx_info[0], influx_info[1], influx_info[2]), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while creating the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("InfluxDB response: {}".format(response)) p.kill() # add database to Grafana grafana_db = '{"name":"%s", "type":"influxdb", "url":"http://%s:%s", "access":"proxy", "isDefault":false, "database":"%s"}' % (influx_info[2], influx_info[0], influx_info[1], influx_info[2]) p = Popen("curl -s 'http://{}:{}/api/datasources' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}' --data-binary '{}'".format(influx_info[0], grafana_port, grafana_api_key, grafana_db), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while creating the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("Grafana response: {}".format(response)) p.kill() except KeyboardInterrupt: if p is not None: p.kill() except Exception as e: print(str(e)) sys.exit(0) def deleteDB(influx_info, grafana_port, grafana_api_key): """delete database from InfluxDB and remove it from Grafana""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) p = None try: answer = str(input("\nAre you sure you would like to delete {}? (Y/N): ".format(influx_info[2]))).lower() except Exception: answer = None if answer is None or answer == "" or answer == "y" or answer == "yes": try: logging.info("Removing database from InfluxDB and Grafana") print("Removing database from InfluxDB and Grafana. Please wait...") # delete database from InfluxDB p = Popen("curl -s -XPOST 'http://'{}':'{}'/query' --data-urlencode 'q=DROP DATABASE {}'".format(influx_info[0], influx_info[1], influx_info[2]), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while removing the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("InfluxDB response: {}".format(response)) p.kill() # get database ID for db removal p = Popen("curl -s -G 'http://{}:{}/api/datasources/id/{}' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}'".format(influx_info[0], grafana_port, influx_info[2], grafana_api_key), shell=True, stdout=PIPE) id = p.stdout.read().split(":")[1].strip("}") if id == "": raise Exception("An error occurred while removing the database: Could not determine the database ID") p.kill() # remove database from Grafana p = Popen("curl -s -XDELETE 'http://{}:{}/api/datasources/{}' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}'".format(influx_info[0], grafana_port, id, grafana_api_key), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while removing the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("Grafana response: {}".format(response)) p.kill() except KeyboardInterrupt: if p is not None: p.kill() except Exception as e: print(str(e)) sys.exit(0) def appendToFile(file, content): """used for output log""" with open(file, "a") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(content + '\n') fcntl.flock(f, fcntl.LOCK_UN) # main program if __name__ == "__main__": # make sure user is root if os.geteuid() != 0: print("Must be run as root!\n") sys.exit(0) # initialize variables cpe_lab = False influx_ip = influx_port = influx_db = "" external_if = "" influx_info = list() grafana_port = "" grafana_api_key = "" controller_services = list() compute_services = list() storage_services = list() rabbit_services = list() common_services = list() services = {} live_svc = ("live_stream.py",) collection_intervals = {"memtop": None, "memstats": None, "occtop": None, "schedtop": None, "load_avg": None, "cpu_count": None, "diskstats": None, "iostat": None, "filestats": None, "netstats": None, "postgres": None, "rabbitmq": None, "vswitch": None} duration = None unconverted_duration = "" collect_api_requests = False api_requests = "" auto_delete_db = False delete_db = "" collect_all_services = False all_services = "" fast_postgres_connections = False fast_postgres = "" config = configparser.ConfigParser() node = os.popen("hostname").read().strip("\n") # get info from engtools.conf try: conf_file = "" if "engtools.conf" in tuple(os.listdir(os.getcwd())): conf_file = os.getcwd() + "/engtools.conf" elif "engtools.conf" in tuple(os.listdir("/etc/engtools/")): conf_file = "/etc/engtools/engtools.conf" config.read(conf_file) if config.get("LabConfiguration", "CPE_LAB").lower() == "y" or config.get("LabConfiguration", "CPE_LAB").lower() == "yes": cpe_lab = True if node.startswith("controller"): external_if = config.get("CollectInternal", "{}_EXTERNAL_INTERFACE".format(node.upper().replace("-", ""))) influx_ip = config.get("RemoteServer", "INFLUX_IP") influx_port = config.get("RemoteServer", "INFLUX_PORT") influx_db = config.get("RemoteServer", "INFLUX_DB") grafana_port = config.get("RemoteServer", "GRAFANA_PORT") grafana_api_key = config.get("RemoteServer", "GRAFANA_API_KEY") duration = config.get("LiveStream", "DURATION") unconverted_duration = config.get("LiveStream", "DURATION") api_requests = config.get("AdditionalOptions", "API_REQUESTS") delete_db = config.get("AdditionalOptions", "AUTO_DELETE_DB") all_services = config.get("AdditionalOptions", "ALL_SERVICES") fast_postgres = config.get("AdditionalOptions", "FAST_POSTGRES_CONNECTIONS") # additional options if api_requests.lower() == "y" or api_requests.lower() == "yes": collect_api_requests = True if delete_db.lower() == "y" or delete_db.lower() == "yes": auto_delete_db = True if all_services.lower() == "y" or all_services.lower() == "yes": collect_all_services = True if fast_postgres.lower() == "y" or fast_postgres.lower() == "yes": fast_postgres_connections = True # convert duration into seconds if duration == "": duration = None elif duration.endswith("s") or duration.endswith("S"): duration = duration.strip("s") duration = duration.strip("S") duration = int(duration) elif duration.endswith("m") or duration.endswith("M"): duration = duration.strip("m") duration = duration.strip("M") duration = int(duration) * 60 elif duration.endswith("h") or duration.endswith("H"): duration = duration.strip("h") duration = duration.strip("H") duration = int(duration) * 3600 elif duration.endswith("d") or duration.endswith("D"): duration = duration.strip("d") duration = duration.strip("D") duration = int(duration) * 3600 * 24 controller_services = tuple(config.get("ControllerServices", "CONTROLLER_SERVICE_LIST").split()) compute_services = tuple(config.get("ComputeServices", "COMPUTE_SERVICE_LIST").split()) storage_services = tuple(config.get("StorageServices", "STORAGE_SERVICE_LIST").split()) rabbit_services = tuple(config.get("RabbitmqServices", "RABBITMQ_QUEUE_LIST").split()) common_services = tuple(config.get("CommonServices", "COMMON_SERVICE_LIST").split()) static_svcs = tuple(config.get("StaticServices", "STATIC_SERVICE_LIST").split()) openstack_services = tuple(config.get("OpenStackServices", "OPEN_STACK_SERVICE_LIST").split()) skip_list = tuple(config.get("SkipList", "SKIP_LIST").split()) exclude_list = tuple(config.get("ExcludeList", "EXCLUDE_LIST").split()) # get collection intervals for i in config.options("Intervals"): if config.get("Intervals", i) == "" or config.get("Intervals", i) is None: collection_intervals[i] = None else: collection_intervals[i] = int(config.get("Intervals", i)) # get api-stats services DB_PORT_NUMBER = config.get("ApiStatsConstantPorts", "DB_PORT_NUMBER") RABBIT_PORT_NUMBER = config.get("ApiStatsConstantPorts", "RABBIT_PORT_NUMBER") SERVICES = OrderedDict() SERVICES_INFO = tuple(config.get("ApiStatsServices", "API_STATS_STRUCTURE").split('|')) for service_string in SERVICES_INFO: service_tuple = tuple(service_string.split(';')) if service_tuple[2] != "" and service_tuple[2] is not None: SERVICES[service_tuple[0]] = {'name': service_tuple[1], 'api-port': service_tuple[2]} else: SERVICES[service_tuple[0]] = {'name': service_tuple[1], 'api-port': None} except Exception: print("An error has occurred when parsing the engtools.conf configuration file: {}".format(sys.exc_info())) sys.exit(0) syseng_services = live_svc + static_svcs if cpe_lab is True: services["controller_services"] = controller_services + compute_services + storage_services + common_services else: controller_services += common_services compute_services += common_services storage_services += common_services services["controller_services"] = controller_services services["compute_services"] = compute_services services["storage_services"] = storage_services services["common_services"] = common_services services["syseng_services"] = syseng_services services["rabbit_services"] = rabbit_services influx_info.append(influx_ip) influx_info.append(influx_port) influx_info.append(influx_db) # add config options to log with open("/tmp/livestream.log", "w") as log_file: log_file.write("Configuration for {}:\n".format(node)) log_file.write("-InfluxDB address: {}:{}\n".format(influx_ip, influx_port)) log_file.write("-InfluxDB name: {}\n".format(influx_db)) log_file.write("-CPE lab: {}\n".format(str(cpe_lab))) log_file.write(("-Collect API requests: {}\n".format(str(collect_api_requests)))) log_file.write(("-Collect all services: {}\n".format(str(collect_all_services)))) log_file.write(("-Fast postgres connections: {}\n".format(str(fast_postgres_connections)))) log_file.write(("-Automatic database removal: {}\n".format(str(auto_delete_db)))) if duration is not None: log_file.write("-Live stream duration: {}\n".format(unconverted_duration)) log_file.close() # add POSTROUTING entry to NAT table if cpe_lab is False: # check controller-0 for NAT entry. If not there, add it if node.startswith("controller"): # use first interface if not specified in engtools.conf if external_if == "" or external_if is None: p = Popen("ifconfig", shell=True, stdout=PIPE) external_if = p.stdout.readline().decode().split(":")[0] p.kill() appendToFile("/tmp/livestream.log", "-External interface for {}: {}".format(node, external_if)) # enable IP forwarding p = Popen("sysctl -w net.ipv4.ip_forward=1 > /dev/null", shell=True) p.communicate() p = Popen("iptables -t nat -L --line-numbers", shell=True, stdout=PIPE) tmp = [line.decode().strip("\n") for line in p.stdout.readlines()] # entries need to be removed in reverse order for line in reversed(tmp): formatted_line = " ".join(line.strip("\n").split()[1:]) # if an entry already exists, remove it if formatted_line.startswith("MASQUERADE tcp -- anywhere"): line_number = line.strip("\n").split()[0] p1 = Popen("iptables -t nat -D POSTROUTING {}".format(line_number), shell=True) p1.communicate() p.kill() appendToFile("/tmp/livestream.log", "-Adding NAT information to allow compute/storage nodes to communicate with remote server\n") # add new entry for both InfluxDB and Grafana p = Popen("iptables -t nat -A POSTROUTING -p tcp -o {} -d {} --dport {} -j MASQUERADE".format(external_if, influx_ip, influx_port), shell=True) p.communicate() p = Popen("iptables -t nat -A POSTROUTING -p tcp -o {} -d {} --dport {} -j MASQUERADE".format(external_if, influx_ip, grafana_port), shell=True) p.communicate() appendToFile("/tmp/livestream.log", "\nStarting collection at {}\n".format(datetime.datetime.utcnow())) tasks = [] createDB(influx_info, grafana_port, grafana_api_key) try: node_type = str(node.split("-")[0]) # if not a standard node, run the common functions with collect_all enabled if node_type != "controller" and node_type != "compute" and node_type != "storage": node_type = "common" collect_all_services = True if collection_intervals["memstats"] is not None: p = Process(target=collectMemstats, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], openstack_services, exclude_list, skip_list, collect_all_services), name="memstats") tasks.append(p) p.start() if collection_intervals["schedtop"] is not None: p = Process(target=collectSchedtop, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], openstack_services, exclude_list, skip_list, collect_all_services), name="schedtop") tasks.append(p) p.start() if collection_intervals["filestats"] is not None: p = Process(target=collectFilestats, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], exclude_list, skip_list, collect_all_services), name="filestats") tasks.append(p) p.start() if collection_intervals["occtop"] is not None: p = Process(target=collectOcctop, args=(influx_info, node, collection_intervals, getPlatformCores(node, cpe_lab)), name="occtop") tasks.append(p) p.start() if collection_intervals["load_avg"] is not None: p = Process(target=collectLoadavg, args=(influx_info, node, collection_intervals), name="load_avg") tasks.append(p) p.start() if collection_intervals["cpu_count"] is not None: p = Process(target=collectCpuCount, args=(influx_info, node, collection_intervals), name="cpu_count") tasks.append(p) p.start() if collection_intervals["memtop"] is not None: p = Process(target=collectMemtop, args=(influx_info, node, collection_intervals), name="memtop") tasks.append(p) p.start() if collection_intervals["diskstats"] is not None: p = Process(target=collectDiskstats, args=(influx_info, node, collection_intervals), name="diskstats") tasks.append(p) p.start() if collection_intervals["iostat"] is not None: p = Process(target=collectIostat, args=(influx_info, node, collection_intervals), name="iostat") tasks.append(p) p.start() if collection_intervals["netstats"] is not None: p = Process(target=collectNetstats, args=(influx_info, node, collection_intervals), name="netstats") tasks.append(p) p.start() if collect_api_requests is True and node_type == "controller": p = Process(target=collectApiStats, args=(influx_info, node, collection_intervals, SERVICES, DB_PORT_NUMBER, RABBIT_PORT_NUMBER), name="api_requests") tasks.append(p) p.start() if node_type == "controller": if collection_intervals["postgres"] is not None: p = Process(target=collectPostgres, args=(influx_info, node, collection_intervals), name="postgres") tasks.append(p) p.start() p = Process(target=collectPostgresConnections, args=(influx_info, node, collection_intervals, fast_postgres_connections), name="postgres_connections") tasks.append(p) p.start() if collection_intervals["rabbitmq"] is not None: p = Process(target=collectRabbitMq, args=(influx_info, node, collection_intervals), name="rabbitmq") tasks.append(p) p.start() p = Process(target=collectRabbitMqSvc, args=(influx_info, node, collection_intervals, services["rabbit_services"]), name="rabbitmq_svc") tasks.append(p) p.start() if node_type == "compute" or cpe_lab is True: if collection_intervals["vswitch"] is not None: p = Process(target=collectVswitch, args=(influx_info, node, collection_intervals), name="vswitch") tasks.append(p) p.start() print("Sending data to InfluxDB. Please tail /tmp/livestream.log") checkDuration(duration) # give a small delay to ensure services have started time.sleep(3) for t in tasks: os.wait() except KeyboardInterrupt: pass finally: # end here once duration param has ended or ctrl-c is pressed appendToFile("/tmp/livestream.log", "\nEnding collection at {}\n".format(datetime.datetime.utcnow())) if tasks is not None and len(tasks) > 0: killProcesses(tasks) if auto_delete_db is True: deleteDB(influx_info, grafana_port, grafana_api_key) sys.exit(0)