#!/usr/bin/python3 """ Copyright (c) 2017-2022 Wind River Systems, Inc. SPDX-License-Identifier: Apache-2.0 """ from collections import OrderedDict import datetime import fcntl import logging from multiprocessing import cpu_count from multiprocessing import Process import os from subprocess import PIPE from subprocess import Popen import sys import time import psutil from six.moves import configparser from six.moves import input def generateString(meas, tag_n, tag_v, field_n, field_v): """generates the required string for the areas where fields are not static""" base = "{},".format(meas) try: for i in range(len(tag_n)): if i == len(tag_n) - 1: # have space between tags and fields base += "'{}'='{}' ".format(tag_n[i], str(tag_v[i])) else: # separate with commas base += "'{}'='{}',".format(tag_n[i], str(tag_v[i])) for i in range(len(field_v)): if str(field_v[i]).replace(".", "").isdigit(): if i == len(field_v) - 1: base += "'{}'='{}'".format(field_n[i], str(field_v[i])) else: base += "'{}'='{}',".format(field_n[i], str(field_v[i])) return base except IndexError: return None def collectMemtop(influx_info, node, ci): """collects system memory information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("memtop data starting collection with a collection interval of {}s".format(ci["memtop"])) measurement = "memtop" tags = {"node": node} MiB = 1024.0 while True: try: fields = OrderedDict([("total", 0), ("used", 0), ("free", 0), ("cached", 0), ("buf", 0), ("slab", 0), ("cas", 0), ("clim", 0), ("dirty", 0), ("wback", 0), ("anon", 0), ("avail", 0)]) with open("/proc/meminfo", "r") as f: hps = 0 # for each line in /proc/meminfo, match with element in fields for line in f: line = line.strip("\n").split() if line[0].strip(":").startswith("MemTotal"): # convert to from kibibytes to mibibytes fields["total"] = float(line[1]) / MiB elif line[0].strip(":").startswith("MemFree"): fields["free"] = int(line[1]) / MiB elif line[0].strip(":").startswith("MemAvailable"): fields["avail"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Buffers"): fields["buf"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Cached"): fields["cached"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Slab"): fields["slab"] = float(line[1]) / MiB elif line[0].strip(":").startswith("CommitLimit"): fields["clim"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Committed_AS"): fields["cas"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Dirty"): fields["dirty"] = float(line[1]) / MiB elif line[0].strip(":").startswith("Writeback"): fields["wback"] = float(line[1]) / MiB elif line[0].strip(":").endswith("(anon)"): fields["anon"] += float(line[1]) / MiB elif line[0].strip(":").endswith("Hugepagesize"): hps = float(line[1]) / MiB fields["used"] = fields["total"] - fields["avail"] f.close() # get platform specific memory info fields["platform_avail"] = 0 fields["platform_hfree"] = 0 for file in os.listdir("/sys/devices/system/node"): if file.startswith("node"): node_num = file.replace("node", "").strip("\n") avail = hfree = 0 with open("/sys/devices/system/node/{}/meminfo".format(file)) as f1: for line in f1: line = line.strip("\n").split() if line[2].strip(":").startswith("MemFree") or line[2].strip(":").startswith("FilePages") or line[2].strip(":").startswith("SReclaimable"): avail += float(line[3]) elif line[2].strip(":").startswith("HugePages_Free"): hfree = float(line[3]) * hps fields["{}:avail".format(node_num)] = avail / MiB fields["{}:hfree".format(node_num)] = hfree # get platform sum fields["platform_avail"] += avail / MiB fields["platform_hfree"] += hfree f1.close() s = generateString(measurement, list(tags.keys()), list(tags.values()), list(fields.keys()), list(fields.values())) if s is None: good_string = False else: good_string = True if good_string: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], s), shell=True) p.communicate() time.sleep(ci["memtop"]) except KeyboardInterrupt: break except Exception: logging.error("memtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectMemstats(influx_info, node, ci, services, syseng_services, openstack_services, exclude_list, skip_list, collect_all): """collects rss and vsz information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("memstats data starting collection with a collection interval of {}s".format(ci["memstats"])) measurement = "memstats" tags = {"node": node} ps_output = None influx_string = "" while True: try: fields = {} ps_output = Popen("exec ps -e -o rss,vsz,cmd", shell=True, stdout=PIPE) # create dictionary of dictionaries if collect_all is False: for svc in services: fields[svc] = {"rss": 0, "vsz": 0} fields["static_syseng"] = {"rss": 0, "vsz": 0} fields["live_syseng"] = {"rss": 0, "vsz": 0} fields["total"] = {"rss": 0, "vsz": 0} ps_output.stdout.readline() while True: # for each line in ps output, get rss and vsz info line = ps_output.stdout.readline().strip("\n").split() # if at end of output, send data if not line: break else: rss = float(line[0]) vsz = float(line[1]) # go through all command outputs for i in range(2, len(line)): # remove unwanted characters and borders from cmd name. Ex: /usr/bin/example.py -> example.py svc = line[i].replace("(", "").replace(")", "").strip(":").split("/")[-1].strip("\n") if svc == "gunicorn": gsvc = line[-1].replace("[", "").replace("]", "").strip("\n") if gsvc == "public:application": gsvc = "keystone-public" elif gsvc == "admin:application": gsvc = "keystone-admin" gsvc = "gunicorn_{}".format(gsvc) if gsvc not in fields: fields[gsvc] = {"rss": rss, "vsz": vsz} else: fields[gsvc]["rss"] += rss fields[gsvc]["vsz"] += vsz elif svc == "postgres": if (len(line) <= i + 2): # Command line could be "sudo su postgres", skip it break if line[i + 1].startswith("-") is False and line[i + 1].startswith("_") is False and line[i + 1] != "psql": psvc = "" if line[i + 2] in openstack_services: psvc = line[i + 2].strip("\n") else: for j in range(i + 1, len(line)): psvc += "{}_".format(line[j].strip("\n")) psvc = "postgres_{}".format(psvc).strip("_") if psvc not in fields: fields[psvc] = {"rss": rss, "vsz": vsz} else: fields[psvc]["rss"] += rss fields[psvc]["vsz"] += vsz if collect_all is False: if svc in services: fields[svc]["rss"] += rss fields[svc]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break elif svc in syseng_services: if svc == "live_stream.py": fields["live_syseng"]["rss"] += rss fields["live_syseng"]["vsz"] += vsz else: fields["static_syseng"]["rss"] += rss fields["static_syseng"]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break # Collect all services else: if svc in exclude_list or svc.startswith("-") or svc[0].isdigit() or svc.startswith("[") or svc.endswith("]"): continue elif svc in skip_list or svc.startswith("IPaddr"): break else: if svc not in fields: fields[svc] = {"rss": rss, "vsz": vsz} else: fields[svc]["rss"] += rss fields[svc]["vsz"] += vsz fields["total"]["rss"] += rss fields["total"]["vsz"] += vsz break # send data to InfluxDB for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", key, "rss", fields[key]["rss"], "vsz", fields[key]["vsz"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" ps_output.kill() time.sleep(ci["memstats"]) except KeyboardInterrupt: if ps_output is not None: ps_output.kill() break except Exception: logging.error("memstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectSchedtop(influx_info, node, ci, services, syseng_services, openstack_services, exclude_list, skip_list, collect_all): """collects task cpu information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("schedtop data starting collection with a collection interval of {}s".format(ci["schedtop"])) measurement = "schedtop" tags = {"node": node} influx_string = "" top_output = Popen("exec top -b -c -w 512 -d{}".format(ci["schedtop"]), shell=True, stdout=PIPE) while True: try: fields = {} pro = psutil.Process(top_output.pid) # if process dies, restart it if pro.status() == "zombie": top_output.kill() top_output = Popen("exec top -b -c -w 512 -d{}".format(ci["schedtop"]), shell=True, stdout=PIPE) if collect_all is False: for svc in services: fields[svc] = 0 fields["static_syseng"] = 0 fields["live_syseng"] = 0 fields["total"] = 0 # check first line line = top_output.stdout.readline() if not line: pass else: # skip header completely for _ in range(6): top_output.stdout.readline() while True: line = top_output.stdout.readline().strip("\n").split() # if end of top output, leave this while loop if not line: break else: occ = float(line[8]) # for each command listed, check if it matches one from the list for i in range(11, len(line)): # remove unwanted characters and borders from cmd name. Ex: /usr/bin/example.py -> example.py svc = line[i].replace("(", "").replace(")", "").strip(":").split("/")[-1] if svc == "gunicorn": gsvc = line[-1].replace("[", "").replace("]", "").strip("\n") if gsvc == "public:application": gsvc = "keystone-public" elif gsvc == "admin:application": gsvc = "keystone-admin" gsvc = "gunicorn_{}".format(gsvc) if gsvc not in fields: fields[gsvc] = occ else: fields[gsvc] += occ elif svc == "postgres": if (len(line) <= i + 2): # Command line could be "sudo su postgres", skip it break if line[i + 1].startswith("-") is False and line[i + 1].startswith("_") is False and line[i + 1] != "psql": psvc = "" if line[i + 2] in openstack_services: psvc = line[i + 2].strip("\n") else: for j in range(i + 1, len(line)): psvc += "{}_".format(line[j].strip("\n")) psvc = "postgres_{}".format(psvc).strip("_") if psvc not in fields: fields[psvc] = occ else: fields[psvc] += occ if collect_all is False: if svc in services: fields[svc] += occ fields["total"] += occ break elif svc in syseng_services: if svc == "live_stream.py": fields["live_syseng"] += occ else: fields["static_syseng"] += occ fields["total"] += occ break # Collect all services else: if svc in exclude_list or svc.startswith("-") or svc[0].isdigit() or svc.startswith("[") or svc.endswith("]"): continue elif svc in skip_list or svc.startswith("IPaddr"): break else: if svc not in fields: fields[svc] = occ else: fields[svc] += occ fields["total"] += occ break for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", key, "occ", fields[key]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["schedtop"]) except KeyboardInterrupt: if top_output is not None: top_output.kill() break except Exception: logging.error("schedtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectDiskstats(influx_info, node, ci): """collects disk utilization information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("diskstats data starting collection with a collection interval of {}s".format(ci["diskstats"])) measurement = "diskstats" tags = {"node": node, "file_system": None, "type": None, "mount": None} fields = {"size": 0, "used": 0, "avail": 0, "usage": 0} influx_string = "" while True: try: parts = psutil.disk_partitions() for i in parts: # gather all partitions tags["mount"] = str(i[1]).split("/")[-1] # if mount == '', call it root if tags["mount"] == "": tags["mount"] = "root" # skip boot elif tags["mount"] == "boot": continue tags["file_system"] = str(i[0]).split("/")[-1] tags["type"] = i[2] u = psutil.disk_usage(i[1]) fields["size"] = u[0] fields["used"] = u[1] fields["avail"] = u[2] fields["usage"] = u[3] influx_string += "{},'{}'='{}','{}'='{}','{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "file_system", tags["file_system"], "type", tags["type"], "mount", tags["mount"], "size", fields["size"], "used", fields["used"], "avail", fields["avail"], "usage", fields["usage"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["diskstats"]) except KeyboardInterrupt: break except Exception: logging.error("diskstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectIostat(influx_info, node, ci): """collect device I/O information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("iostat data starting collection with a collection interval of {}s".format(ci["iostat"])) measurement = "iostat" tags = {"node": node} sector_size = 512.0 influx_string = "" while True: try: fields = {} tmp = {} tmp1 = {} start = time.time() # get initial values for dev in os.listdir("/sys/block/"): if dev.startswith("sr"): continue else: fields[dev] = {"r/s": 0, "w/s": 0, "io/s": 0, "rkB/s": 0, "wkB/s": 0, "rrqms/s": 0, "wrqms/s": 0, "util": 0} tmp[dev] = {"init_reads": 0, "init_reads_merged": 0, "init_read_sectors": 0, "init_read_wait": 0, "init_writes": 0, "init_writes_merged": 0, "init_write_sectors": 0, "init_write_wait": 0, "init_io_progress": 0, "init_io_time": 0, "init_wait_time": 0} with open("/sys/block/{}/stat".format(dev), "r") as f: # get initial readings line = f.readline().strip("\n").split() tmp[dev]["init_reads"] = int(line[0]) tmp[dev]["init_reads_merged"] = int(line[1]) tmp[dev]["init_read_sectors"] = int(line[2]) tmp[dev]["init_read_wait"] = int(line[3]) tmp[dev]["init_writes"] = int(line[4]) tmp[dev]["init_writes_merged"] = int(line[5]) tmp[dev]["init_write_sectors"] = int(line[6]) tmp[dev]["init_write_wait"] = int(line[7]) tmp[dev]["init_io_progress"] = int(line[8]) tmp[dev]["init_io_time"] = int(line[9]) tmp[dev]["init_wait_time"] = int(line[10]) time.sleep(ci["iostat"]) dt = time.time() - start # get values again for dev in os.listdir("/sys/block/"): if dev.startswith("sr"): continue else: # during a swact, some devices may not have been read in the initial reading. If found now, add them to dict if dev not in fields: fields[dev] = {"r/s": 0, "w/s": 0, "io/s": 0, "rkB/s": 0, "wkB/s": 0, "rrqms/s": 0, "wrqms/s": 0, "util": 0} tmp1[dev] = {"reads": 0, "reads_merged": 0, "read_sectors": 0, "read_wait": 0, "writes": 0, "writes_merged": 0, "write_sectors": 0, "write_wait": 0, "io_progress": 0, "io_time": 0, "wait_time": 0} with open("/sys/block/{}/stat".format(dev), "r") as f: line = f.readline().strip("\n").split() tmp1[dev]["reads"] = int(line[0]) tmp1[dev]["reads_merged"] = int(line[1]) tmp1[dev]["read_sectors"] = int(line[2]) tmp1[dev]["read_wait"] = int(line[3]) tmp1[dev]["writes"] = int(line[4]) tmp1[dev]["writes_merged"] = int(line[5]) tmp1[dev]["write_sectors"] = int(line[6]) tmp1[dev]["write_wait"] = int(line[7]) tmp1[dev]["io_progress"] = int(line[8]) tmp1[dev]["io_time"] = int(line[9]) tmp1[dev]["wait_time"] = int(line[10]) # take difference and divide by delta t for key in fields: # if device was found in initial and second reading, do calculation if key in tmp and key in tmp1: fields[key]["r/s"] = abs(tmp1[key]["reads"] - tmp[key]["init_reads"]) / dt fields[key]["w/s"] = abs(tmp1[key]["writes"] - tmp[key]["init_writes"]) / dt fields[key]["rkB/s"] = abs(tmp1[key]["read_sectors"] - tmp[key]["init_read_sectors"]) * sector_size / dt / 1000 fields[key]["wkB/s"] = abs(tmp1[key]["write_sectors"] - tmp[key]["init_write_sectors"]) * sector_size / dt / 1000 fields[key]["rrqms/s"] = abs(tmp1[key]["reads_merged"] - tmp[key]["init_reads_merged"]) / dt fields[key]["wrqms/s"] = abs(tmp1[key]["writes_merged"] - tmp[key]["init_writes_merged"]) / dt fields[key]["io/s"] = fields[key]["r/s"] + fields[key]["w/s"] + fields[key]["rrqms/s"] + fields[key]["wrqms/s"] fields[key]["util"] = abs(tmp1[key]["io_time"] - tmp[key]["init_io_time"]) / dt / 10 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "device", key, "r/s", fields[key]["r/s"], "w/s", fields[key]["w/s"], "rkB/s", fields[key]["rkB/s"], "wkB/s", fields[key]["wkB/s"], "rrqms/s", fields[key]["rrqms/s"], "wrqms/s", fields[key]["wrqms/s"], "io/s", fields[key]["io/s"], "util", fields[key]["util"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("iostat collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectLoadavg(influx_info, node, ci): """collects cpu load average information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("load_avg data starting collection with a collection interval of {}s".format(ci["load_avg"])) measurement = "load_avg" tags = {"node": node} fields = {"load_avg": 0} while True: try: fields["load_avg"] = os.getloadavg()[0] p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{},'{}'='{}' '{}'='{}''".format(influx_info[0], influx_info[1], influx_info[2], measurement, "node", tags["node"], "load_avg", fields["load_avg"]), shell=True) p.communicate() time.sleep(ci["load_avg"]) except KeyboardInterrupt: break except Exception: logging.error("load_avg collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectOcctop(influx_info, node, ci, pc): """collects cpu utilization information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("occtop data starting collection with a collection interval of {}s".format(ci["occtop"])) measurement = "occtop" tags = {"node": node} platform_cores = pc influx_string = "" while True: try: cpu = psutil.cpu_percent(percpu=True) cpu_times = psutil.cpu_times_percent(percpu=True) fields = {} # sum all cpu percents total = float(sum(cpu)) sys_total = 0 fields["platform_total"] = {"usage": 0, "system": 0} cores = 0 # for each core, get values and assign a tag for el in cpu: fields["usage"] = float(el) fields["system"] = float(cpu_times[cores][2]) sys_total += float(cpu_times[cores][2]) tags["core"] = "core_{}".format(cores) influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", tags["core"], "usage", fields["usage"], "system", fields["system"]) + "\n" if len(platform_cores) > 0: if cores in platform_cores: fields["platform_total"]["usage"] += float(el) fields["platform_total"]["system"] += float(cpu_times[cores][2]) cores += 1 # add usage and system total to influx string if len(platform_cores) > 0: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", "platform_total", "usage", fields["platform_total"]["usage"], "system", fields["platform_total"]["system"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "core", "total", "usage", total, "system", sys_total) + "\n" # send data to Influx p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["occtop"]) except KeyboardInterrupt: break except Exception: logging.error("occtop collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectNetstats(influx_info, node, ci): """collects network interface information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("netstats data starting collection with a collection interval of {}s".format(ci["netstats"])) measurement = "netstats" tags = {"node": node} fields = {} prev_fields = {} Mbps = float(1000000 / 8) influx_string = "" while True: try: net = psutil.net_io_counters(pernic=True) # get initial data for difference calculation for key in net: prev_fields[key] = {"tx_B": net[key][0], "rx_B": net[key][1], "tx_p": net[key][2], "rx_p": net[key][3]} start = time.time() time.sleep(ci["netstats"]) net = psutil.net_io_counters(pernic=True) # get new data for difference calculation dt = time.time() - start for key in net: tx_B = (float(net[key][0]) - float(prev_fields[key]["tx_B"])) tx_Mbps = tx_B / Mbps / dt rx_B = (float(net[key][1]) - float(prev_fields[key]["rx_B"])) rx_Mbps = rx_B / Mbps / dt tx_pps = (float(net[key][2]) - float(prev_fields[key]["tx_p"])) / dt rx_pps = (float(net[key][3]) - float(prev_fields[key]["rx_p"])) / dt # ensure no division by zero if rx_B > 0 and rx_pps > 0: rx_packet_size = rx_B / rx_pps else: rx_packet_size = 0 if tx_B > 0 and tx_pps > 0: tx_packet_size = tx_B / tx_pps else: tx_packet_size = 0 fields[key] = {"tx_mbps": tx_Mbps, "rx_mbps": rx_Mbps, "tx_pps": tx_pps, "rx_pps": rx_pps, "tx_packet_size": tx_packet_size, "rx_packet_size": rx_packet_size} for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "interface", key, "rx_mbps", fields[key]["rx_mbps"], "tx_mbps", fields[key]["tx_mbps"], "rx_pps", fields[key]["rx_pps"], "tx_pps", fields[key]["tx_pps"], "rx_packet_size", fields[key]["rx_packet_size"], "tx_packet_size", fields[key]["tx_packet_size"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("netstats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectPostgres(influx_info, node, ci): """collects postgres db size and postgres service size information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("postgres data starting collection with a collection interval of {}s".format(ci["postgres"])) measurement = "postgres_db_size" measurement1 = "postgres_svc_stats" tags = {"node": node, "service": None, "table_schema": 0, "table": None} fields = {"db_size": 0, "connections": 0} fields1 = {"table_size": 0, "total_size": 0, "index_size": 0, "live_tuples": 0, "dead_tuples": 0} postgres_output = postgres_output1 = None influx_string = influx_string1 = "" good_string = False dbcount = 0 BATCH_SIZE = 10 while True: try: # make sure this is active controller, otherwise postgres queries wont work if isActiveController(): while True: postgres_output = Popen("sudo -u postgres psql --pset pager=off -q -t -c'SELECT datname, pg_database_size(datname) FROM pg_database WHERE datistemplate = false;'", shell=True, stdout=PIPE) db_lines = postgres_output.stdout.read().replace(" ", "").strip().split("\n") if db_lines == "" or db_lines is None: postgres_output.kill() break else: # for each database from the previous output for line in db_lines: if not line: break line = line.replace(" ", "").split("|") tags["service"] = line[0] fields["db_size"] = line[1] # send DB size to InfluxDB influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "db_size", fields["db_size"]) + "\n" # get tables for each database sql = "SELECT table_schema,table_name,pg_size_pretty(table_size) AS table_size,pg_size_pretty(indexes_size) AS indexes_size,pg_size_pretty(total_size) AS total_size,live_tuples,dead_tuples FROM (SELECT table_schema,table_name,pg_table_size(table_name) AS table_size,pg_indexes_size(table_name) AS indexes_size,pg_total_relation_size(table_name) AS total_size,pg_stat_get_live_tuples(table_name::regclass) AS live_tuples,pg_stat_get_dead_tuples(table_name::regclass) AS dead_tuples FROM (SELECT table_schema,table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE') AS all_tables ORDER BY total_size DESC) AS pretty_sizes;" postgres_output1 = Popen('sudo -u postgres psql --pset pager=off -q -t -d{} -c"{}"'.format(line[0], sql), shell=True, stdout=PIPE) tbl_lines = postgres_output1.stdout.read().replace(" ", "").strip().split("\n") for line in tbl_lines: if line == "": continue else: line = line.replace(" ", "").split("|") elements = list() # ensures all data is present if len(line) != 7: good_string = False break else: # do some conversions for el in line: if el.endswith("bytes"): el = int(el.replace("bytes", "")) elif el.endswith("kB"): el = el.replace("kB", "") el = int(el) * 1000 elif el.endswith("MB"): el = el.replace("MB", "") el = int(el) * 1000000 elif el.endswith("GB"): el = el.replace("GB", "") el = int(el) * 1000000000 elements.append(el) tags["table_schema"] = elements[0] tags["table"] = elements[1] fields1["table_size"] = int(elements[2]) fields1["index_size"] = int(elements[3]) fields1["total_size"] = int(elements[4]) fields1["live_tuples"] = int(elements[5]) fields1["dead_tuples"] = int(elements[6]) influx_string1 += "{},'{}'='{}','{}'='{}','{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement1, "node", tags["node"], "service", tags["service"], "table_schema", tags["table_schema"], "table", tags["table"], "table_size", fields1["table_size"], "index_size", fields1["index_size"], "total_size", fields1["total_size"], "live_tuples", fields1["live_tuples"], "dead_tuples", fields1["dead_tuples"]) + "\n" good_string = True dbcount += 1 if dbcount == BATCH_SIZE and good_string: # Curl will barf if the batch is too large p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string1), shell=True) p.communicate() influx_string1 = "" dbcount = 0 if good_string: # send table data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string1), shell=True) p.communicate() influx_string = influx_string1 = "" dbcount = 0 time.sleep(ci["postgres"]) postgres_output1.kill() postgres_output.kill() else: time.sleep(20) except KeyboardInterrupt: if postgres_output is not None: postgres_output.kill() if postgres_output1 is not None: postgres_output1.kill() break except Exception: logging.error("postgres collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectPostgresConnections(influx_info, node, ci, fast): """collect postgres connections information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) if fast: logging.info("postgres_connections data starting collection with a constant collection interval") else: logging.info("postgres_connections data starting collection with a collection interval of {}s".format(ci["postgres"])) measurement = "postgres_connections" tags = {"node": node, "service": None, "state": None} connections_output = None influx_string = "" while True: try: # make sure this is active controller, otherwise postgres queries wont work if isActiveController(): while True: fields = {} # outputs a list of postgres dbs and their connections connections_output = Popen("sudo -u postgres psql --pset pager=off -q -c 'SELECT datname,state,count(*) from pg_stat_activity group by datname,state;'", shell=True, stdout=PIPE) line = connections_output.stdout.readline() if line == "" or line is None: break # skip header connections_output.stdout.readline() while True: line = connections_output.stdout.readline().strip("\n") if not line: break else: line = line.replace(" ", "").split("|") if len(line) != 3: continue else: svc = line[0] connections = int(line[2]) tags["service"] = svc if svc not in fields: fields[svc] = {"active": 0, "idle": 0, "other": 0} if line[1] == "active": fields[svc]["active"] = connections elif line[1] == "idle": fields[svc]["idle"] = connections else: fields[svc]["other"] = connections influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "active", "connections", fields[svc]["active"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "idle", "connections", fields[svc]["idle"]) + "\n" influx_string += "{},'{}'='{}','{}'='{}','{}'='{}' '{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "state", "other", "connections", fields[svc]["other"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" connections_output.kill() if fast: pass else: time.sleep(ci["postgres"]) else: time.sleep(20) except KeyboardInterrupt: if connections_output is not None: connections_output.kill() break except Exception: logging.error("postgres_connections collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectRabbitMq(influx_info, node, ci): """collects rabbitmq information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("rabbitmq data starting collection with a collection interval of {}s".format(ci["rabbitmq"])) measurement = "rabbitmq" tags = OrderedDict([("node", node)]) rabbitmq_output = None while True: try: # make sure this is active controller, otherwise rabbit queries wont work if isActiveController(): while True: fields = OrderedDict([]) rabbitmq_output = Popen("sudo rabbitmqctl -n rabbit@localhost status", shell=True, stdout=PIPE) # needed data starts where output = '{memory,[' line = rabbitmq_output.stdout.readline() # if no data is returned, exit if line == "" or line is None: rabbitmq_output.kill() break else: line = rabbitmq_output.stdout.read().strip("\n").split("{memory,[") if len(line) != 2: rabbitmq_output.kill() break else: # remove brackets from data info = line[1].replace(" ", "").replace("{", "").replace("}", "").replace("\n", "").replace("[", "").replace("]", "").split(",") for i in range(len(info) - 3): if info[i].endswith("total"): info[i] = info[i].replace("total", "memory_total") # some data needs string manipulation if info[i].startswith("clustering") or info[i].startswith("amqp"): info[i] = "listeners_" + info[i] if info[i].startswith("total_"): info[i] = "descriptors_" + info[i] if info[i].startswith("limit") or info[i].startswith("used"): info[i] = "processes_" + info[i] if info[i].replace("_", "").isalpha() and info[i + 1].isdigit(): fields[info[i]] = info[i + 1] s = generateString(measurement, list(tags.keys()), list(tags.values()), list(fields.keys()), list(fields.values())) if s is None: rabbitmq_output.kill() else: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], s), shell=True) p.communicate() time.sleep(ci["rabbitmq"]) rabbitmq_output.kill() else: time.sleep(20) except KeyboardInterrupt: if rabbitmq_output is not None: rabbitmq_output.kill() break except Exception: logging.error("rabbitmq collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectRabbitMqSvc(influx_info, node, ci, services): """collects rabbitmq messaging information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("rabbitmq_svc data starting collection with a collection interval of {}s".format(ci["rabbitmq"])) measurement = "rabbitmq_svc" tags = {"node": node, "service": None} fields = {"messages": 0, "messages_ready": 0, "messages_unacknowledged": 0, "memory": 0, "consumers": 0} rabbitmq_svc_output = None good_string = False influx_string = "" while True: try: # make sure this is active controller, otherwise rabbit queries wont work if isActiveController(): while True: rabbitmq_svc_output = Popen("sudo rabbitmqctl -n rabbit@localhost list_queues name messages messages_ready messages_unacknowledged memory consumers", shell=True, stdout=PIPE) # # if no data is returned, exit if rabbitmq_svc_output.stdout.readline() == "" or rabbitmq_svc_output.stdout.readline() is None: rabbitmq_svc_output.kill() break else: for line in rabbitmq_svc_output.stdout: line = line.split() if not line: break else: if len(line) != 6: good_string = False break else: # read line and fill fields if line[0] in services: tags["service"] = line[0] fields["messages"] = line[1] fields["messages_ready"] = line[2] fields["messages_unacknowledged"] = line[3] fields["memory"] = line[4] fields["consumers"] = line[5] influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", tags["service"], "messages", fields["messages"], "messages_ready", fields["messages_ready"], "messages_unacknowledged", fields["messages_unacknowledged"], "memory", fields["memory"], "consumers", fields["consumers"]) + "\n" good_string = True if good_string: # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["rabbitmq"]) rabbitmq_svc_output.kill() else: time.sleep(20) except KeyboardInterrupt: if rabbitmq_svc_output is not None: rabbitmq_svc_output.kill() break except Exception: logging.error("rabbitmq_svc collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectFilestats(influx_info, node, ci, services, syseng_services, exclude_list, skip_list, collect_all): """collects open file information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("filestats data starting collection with a collection interval of {}s".format(ci["filestats"])) measurement = "filestats" tags = {"node": node} influx_string = "" while True: try: fields = {} # fill dict with services from engtools.conf if collect_all is False: for svc in services: fields[svc] = {"read/write": 0, "write": 0, "read": 0} fields["static_syseng"] = {"read/write": 0, "write": 0, "read": 0} fields["live_syseng"] = {"read/write": 0, "write": 0, "read": 0} fields["total"] = {"read/write": 0, "write": 0, "read": 0} for process in os.listdir("/proc/"): if process.isdigit(): # sometimes the process dies before reading its info try: svc = psutil.Process(int(process)).name() svc = svc.split()[0].replace("(", "").replace(")", "").strip(":").split("/")[-1] except Exception: continue if collect_all is False: if svc in services: try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if priv[1] == "r" and priv[2] == "w": fields[svc]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields[svc]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields[svc]["write"] += 1 fields["total"]["write"] += 1 except Exception: p.kill() continue p.kill() elif svc in syseng_services: try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if svc == "live_stream.py": if priv[1] == "r" and priv[2] == "w": fields["live_syseng"]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields["live_syseng"]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields["live_syseng"]["write"] += 1 fields["total"]["write"] += 1 else: if priv[1] == "r" and priv[2] == "w": fields["static_syseng"]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields["static_syseng"]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields["static_syseng"]["write"] += 1 fields["total"]["write"] += 1 except Exception: p.kill() continue p.kill() else: # remove garbage processes if svc in exclude_list or svc in skip_list or svc.startswith("-") or svc.endswith("-") or svc[0].isdigit() or svc[-1].isdigit() or svc[0].isupper(): continue elif svc not in fields: fields[svc] = {"read/write": 0, "write": 0, "read": 0} try: p = Popen("ls -l /proc/{}/fd".format(process), shell=True, stdout=PIPE) p.stdout.readline() while True: line = p.stdout.readline().strip("\n").split() if not line: break else: priv = line[0] if priv[1] == "r" and priv[2] == "w": fields[svc]["read/write"] += 1 fields["total"]["read/write"] += 1 elif priv[1] == "r" and priv[2] != "w": fields[svc]["read"] += 1 fields["total"]["read"] += 1 elif priv[1] != "r" and priv[2] == "w": fields[svc]["write"] += 1 fields["total"]["write"] += 1 if fields[svc]["read/write"] == 0 and fields[svc]["read"] == 0 and fields[svc]["write"] == 0: del fields[svc] except Exception: p.kill() continue p.kill() for key in fields: influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", key, "read/write", fields[key]["read/write"], "write", fields[key]["write"], "read", fields[key]["read"]) + "\n" # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["filestats"]) except KeyboardInterrupt: break except Exception: logging.error("filestats collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectVswitch(influx_info, node, ci): """collects vshell information""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("vswitch data starting collection with a collection interval of {}s".format(ci["vswitch"])) measurement = "vswitch" tags = OrderedDict([("node", node), ("engine", 0)]) tags1 = OrderedDict([("node", node), ("port", 0)]) tags2 = OrderedDict([("node", node), ("interface", 0)]) fields = OrderedDict([("cpuid", 0), ("rx_packets", 0), ("tx_packets", 0), ("rx_discard", 0), ("tx_discard", 0), ("tx_disabled", 0), ("tx_overflow", 0), ("tx_timeout", 0), ("usage", 0)]) fields1 = OrderedDict([("rx_packets", 0), ("tx_packets", 0), ("rx_bytes", 0), ("tx_bytes", 0), ("tx_errors", 0), ("rx_errors", 0), ("rx_nombuf", 0)]) fields2 = OrderedDict([("rx_packets", 0), ("tx_packets", 0), ("rx_bytes", 0), ("tx_bytes", 0), ("tx_errors", 0), ("rx_errors", 0), ("tx_discards", 0), ("rx_discards", 0), ("rx_floods", 0), ("rx_no_vlan", 0)]) vshell_engine_stats_output = vshell_port_stats_output = vshell_interface_stats_output = None influx_string = "" while True: try: vshell_engine_stats_output = Popen("vshell engine-stats-list", shell=True, stdout=PIPE) # skip first few lines vshell_engine_stats_output.stdout.readline() vshell_engine_stats_output.stdout.readline() vshell_engine_stats_output.stdout.readline() while True: line = vshell_engine_stats_output.stdout.readline().replace("|", "").split() if not line: break # skip lines like +++++++++++++++++++++++++++++ elif line[0].startswith("+"): continue else: # get info from output i = 2 tags["engine"] = line[1] for key in fields: fields[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags.keys())[0], list(tags.values())[0], list(tags.keys())[1], list(tags.values())[1], list(fields.keys())[0], list(fields.values())[0], list(fields.keys())[1], list(fields.values())[1], list(fields.keys())[2], list(fields.values())[2], list(fields.keys())[3], list(fields.values())[3], list(fields.keys())[4], list(fields.values())[4], list(fields.keys())[5], list(fields.values())[5], list(fields.keys())[6], list(fields.values())[6], list(fields.keys())[7], list(fields.values())[7], list(fields.keys())[8], list(fields.values())[8]) + "\n" vshell_engine_stats_output.kill() vshell_port_stats_output = Popen("vshell port-stats-list", shell=True, stdout=PIPE) vshell_port_stats_output.stdout.readline() vshell_port_stats_output.stdout.readline() vshell_port_stats_output.stdout.readline() while True: line = vshell_port_stats_output.stdout.readline().replace("|", "").split() if not line: break elif line[0].startswith("+"): continue else: i = 3 tags1["port"] = line[1] for key in fields1: fields1[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags1.keys())[0], list(tags1.values())[0], list(tags1.keys())[1], list(tags1.values())[1], list(fields1.keys())[0], list(fields1.values())[0], list(fields1.keys())[1], list(fields1.values())[1], list(fields1.keys())[2], list(fields1.values())[2], list(fields1.keys())[3], list(fields1.values())[3], list(fields1.keys())[4], list(fields1.values())[4], list(fields1.keys())[5], list(fields1.values())[5], list(fields1.keys())[6], list(fields1.values())[6]) + "\n" vshell_port_stats_output.kill() vshell_interface_stats_output = Popen("vshell interface-stats-list", shell=True, stdout=PIPE) vshell_interface_stats_output.stdout.readline() vshell_interface_stats_output.stdout.readline() vshell_interface_stats_output.stdout.readline() while True: line = vshell_interface_stats_output.stdout.readline().replace("|", "").split() if not line: break elif line[0].startswith("+"): continue else: if line[2] == "ethernet" and line[3].startswith("eth"): i = 4 tags2["interface"] = line[3] for key in fields2: fields2[key] = line[i].strip("%") i += 1 influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}','{}'='{}'".format(measurement, list(tags2.keys())[0], list(tags2.values())[0], list(tags2.keys())[1], list(tags2.values())[1], list(fields2.keys())[0], list(fields2.values())[0], list(fields2.keys())[1], list(fields2.values())[1], list(fields2.keys())[2], list(fields2.values())[2], list(fields2.keys())[3], list(fields2.values())[3], list(fields2.keys())[4], list(fields2.values())[4], list(fields2.keys())[5], list(fields2.values())[5], list(fields2.keys())[6], list(fields2.values())[6], list(fields2.keys())[7], list(fields2.values())[7], list(fields2.keys())[8], list(fields2.values())[8], list(fields2.keys())[9], list(fields2.values())[9]) + "\n" else: continue vshell_interface_stats_output.kill() # send data to InfluxDB p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" time.sleep(ci["vswitch"]) except KeyboardInterrupt: if vshell_engine_stats_output is not None: vshell_engine_stats_output.kill() if vshell_port_stats_output is not None: vshell_port_stats_output.kill() if vshell_interface_stats_output is not None: vshell_interface_stats_output.kill() break except Exception: logging.error("vswitch collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def collectCpuCount(influx_info, node, ci): """collects the number of cores""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("cpu_count data starting collection with a collection interval of {}s".format(ci["cpu_count"])) measurement = "cpu_count" tags = {"node": node} while True: try: fields = {"cpu_count": cpu_count()} p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{},'{}'='{}' '{}'='{}''".format(influx_info[0], influx_info[1], influx_info[2], measurement, "node", tags["node"], "cpu_count", fields["cpu_count"]), shell=True) p.communicate() time.sleep(ci["cpu_count"]) except KeyboardInterrupt: break except Exception: logging.error("cpu_count collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) def collectApiStats(influx_info, node, ci, services, db_port, rabbit_port): logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) logging.info("api_request data starting collection with a collection interval of {}s".format(ci["cpu_count"])) measurement = "api_requests" tags = {"node": node} influx_string = "" lsof_args = ['lsof', '-Pn', '-i', 'tcp'] while True: try: fields = {} lsof_result = Popen(lsof_args, shell=False, stdout=PIPE) lsof_lines = list() while True: line = lsof_result.stdout.readline().strip("\n") if not line: break lsof_lines.append(line) lsof_result.kill() for name, service in services.items(): pid_list = list() check_pid = False if name == "keystone-public": check_pid = True ps_result = Popen("pgrep -f --delimiter=' ' keystone-public", shell=True, stdout=PIPE) pid_list = ps_result.stdout.readline().strip().split(' ') ps_result.kill() elif name == "gnocchi-api": check_pid = True ps_result = Popen("pgrep -f --delimiter=' ' gnocchi-api", shell=True, stdout=PIPE) pid_list = ps_result.stdout.readline().strip().split(' ') ps_result.kill() api_count = 0 db_count = 0 rabbit_count = 0 for line in lsof_lines: if service['name'] is not None and service['name'] in line and (not check_pid or any(pid in line for pid in pid_list)): if service['api-port'] is not None and service['api-port'] in line: api_count += 1 elif db_port is not None and db_port in line: db_count += 1 elif rabbit_port is not None and rabbit_port in line: rabbit_count += 1 fields[name] = {"api": api_count, "db": db_count, "rabbit": rabbit_count} influx_string += "{},'{}'='{}','{}'='{}' '{}'='{}','{}'='{}','{}'='{}'".format(measurement, "node", tags["node"], "service", name, "api", fields[name]["api"], "db", fields[name]["db"], "rabbit", fields[name]["rabbit"]) + "\n" p = Popen("curl -s -o /dev/null 'http://'{}':'{}'/write?db='{}'' --data-binary '{}'".format(influx_info[0], influx_info[1], influx_info[2], influx_string), shell=True) p.communicate() influx_string = "" except KeyboardInterrupt: break except Exception: logging.error("api_request collection stopped unexpectedly with error: {}. Restarting process...".format(sys.exc_info())) time.sleep(3) def getPlatformCores(node, cpe): """returns the cores dedicated to platform use""" if cpe is True or node.startswith("compute"): logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) core_list = list() try: with open("/etc/platform/worker_reserved.conf", "r") as f: for line in f: if line.startswith("PLATFORM_CPU_LIST"): core_list = line.split("=")[1].replace("\"", "").strip("\n").split(",") core_list = [int(x) for x in core_list] return core_list except Exception: logging.warning("skipping platform specific collection for {} due to error: {}".format(node, sys.exc_info())) return core_list else: return [] def isActiveController(): """determine if controller is active/standby""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) try: p = Popen("sm-dump", shell=True, stdout=PIPE) p.stdout.readline() p.stdout.readline() # read line for active/standby line = p.stdout.readline().strip("\n").split() per = line[1] p.kill() if per == "active": return True else: return False except Exception: if p is not None: p.kill() logging.error("sm-dump command could not be called properly. This is usually caused by a swact. Trying again on next call: {}".format(sys.exc_info())) return False def checkDuration(duration): """checks whether the duration param has been set. If set, sleep; then kill processes upon waking up""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) if duration is None: return None else: time.sleep(duration) print("Duration interval has ended. Killing processes now") logging.warning("Duration interval has ended. Killing processes now") raise KeyboardInterrupt def killProcesses(tasks): """kill all processes and log each death""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) for t in tasks: try: logging.info("{} data stopped collection".format(str(t.name))) t.terminate() except Exception: continue def createDB(influx_info, grafana_port, grafana_api_key): """create database in InfluxDB and add it to Grafana""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) p = None try: logging.info("Adding database to InfluxDB and Grafana") # create database in InfluxDB if not already created. Will NOT overwrite previous db p = Popen("curl -s -XPOST 'http://'{}':'{}'/query' --data-urlencode 'q=CREATE DATABASE {}'".format(influx_info[0], influx_info[1], influx_info[2]), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while creating the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("InfluxDB response: {}".format(response)) p.kill() # add database to Grafana grafana_db = '{"name":"%s", "type":"influxdb", "url":"http://%s:%s", "access":"proxy", "isDefault":false, "database":"%s"}' % (influx_info[2], influx_info[0], influx_info[1], influx_info[2]) p = Popen("curl -s 'http://{}:{}/api/datasources' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}' --data-binary '{}'".format(influx_info[0], grafana_port, grafana_api_key, grafana_db), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while creating the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("Grafana response: {}".format(response)) p.kill() except KeyboardInterrupt: if p is not None: p.kill() except Exception as e: print(str(e)) sys.exit(0) def deleteDB(influx_info, grafana_port, grafana_api_key): """delete database from InfluxDB and remove it from Grafana""" logging.basicConfig(filename="/tmp/livestream.log", filemode="a", format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) p = None try: answer = str(input("\nAre you sure you would like to delete {}? (Y/N): ".format(influx_info[2]))).lower() except Exception: answer = None if answer is None or answer == "" or answer == "y" or answer == "yes": try: logging.info("Removing database from InfluxDB and Grafana") print("Removing database from InfluxDB and Grafana. Please wait...") # delete database from InfluxDB p = Popen("curl -s -XPOST 'http://'{}':'{}'/query' --data-urlencode 'q=DROP DATABASE {}'".format(influx_info[0], influx_info[1], influx_info[2]), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while removing the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("InfluxDB response: {}".format(response)) p.kill() # get database ID for db removal p = Popen("curl -s -G 'http://{}:{}/api/datasources/id/{}' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}'".format(influx_info[0], grafana_port, influx_info[2], grafana_api_key), shell=True, stdout=PIPE) id = p.stdout.read().split(":")[1].strip("}") if id == "": raise Exception("An error occurred while removing the database: Could not determine the database ID") p.kill() # remove database from Grafana p = Popen("curl -s -XDELETE 'http://{}:{}/api/datasources/{}' -H 'Accept: application/json' -H 'Content-Type: application/json' -H 'Authorization: Bearer {}'".format(influx_info[0], grafana_port, id, grafana_api_key), shell=True, stdout=PIPE) response = p.stdout.read().strip("\n") if response == "": raise Exception("An error occurred while removing the database: Please make sure the Grafana and InfluxDB services are running") else: logging.info("Grafana response: {}".format(response)) p.kill() except KeyboardInterrupt: if p is not None: p.kill() except Exception as e: print(str(e)) sys.exit(0) def appendToFile(file, content): """used for output log""" with open(file, "a") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(content + '\n') fcntl.flock(f, fcntl.LOCK_UN) # main program if __name__ == "__main__": # make sure user is root if os.geteuid() != 0: print("Must be run as root!\n") sys.exit(0) # initialize variables cpe_lab = False influx_ip = influx_port = influx_db = "" external_if = "" influx_info = list() grafana_port = "" grafana_api_key = "" controller_services = list() compute_services = list() storage_services = list() rabbit_services = list() common_services = list() services = {} live_svc = ("live_stream.py",) collection_intervals = {"memtop": None, "memstats": None, "occtop": None, "schedtop": None, "load_avg": None, "cpu_count": None, "diskstats": None, "iostat": None, "filestats": None, "netstats": None, "postgres": None, "rabbitmq": None, "vswitch": None} duration = None unconverted_duration = "" collect_api_requests = False api_requests = "" auto_delete_db = False delete_db = "" collect_all_services = False all_services = "" fast_postgres_connections = False fast_postgres = "" config = configparser.ConfigParser() node = os.popen("hostname").read().strip("\n") # get info from engtools.conf try: conf_file = "" if "engtools.conf" in tuple(os.listdir(os.getcwd())): conf_file = os.getcwd() + "/engtools.conf" elif "engtools.conf" in tuple(os.listdir("/etc/engtools/")): conf_file = "/etc/engtools/engtools.conf" config.read(conf_file) if config.get("LabConfiguration", "CPE_LAB").lower() == "y" or config.get("LabConfiguration", "CPE_LAB").lower() == "yes": cpe_lab = True if node.startswith("controller"): external_if = config.get("CollectInternal", "{}_EXTERNAL_INTERFACE".format(node.upper().replace("-", ""))) influx_ip = config.get("RemoteServer", "INFLUX_IP") influx_port = config.get("RemoteServer", "INFLUX_PORT") influx_db = config.get("RemoteServer", "INFLUX_DB") grafana_port = config.get("RemoteServer", "GRAFANA_PORT") grafana_api_key = config.get("RemoteServer", "GRAFANA_API_KEY") duration = config.get("LiveStream", "DURATION") unconverted_duration = config.get("LiveStream", "DURATION") api_requests = config.get("AdditionalOptions", "API_REQUESTS") delete_db = config.get("AdditionalOptions", "AUTO_DELETE_DB") all_services = config.get("AdditionalOptions", "ALL_SERVICES") fast_postgres = config.get("AdditionalOptions", "FAST_POSTGRES_CONNECTIONS") # additional options if api_requests.lower() == "y" or api_requests.lower() == "yes": collect_api_requests = True if delete_db.lower() == "y" or delete_db.lower() == "yes": auto_delete_db = True if all_services.lower() == "y" or all_services.lower() == "yes": collect_all_services = True if fast_postgres.lower() == "y" or fast_postgres.lower() == "yes": fast_postgres_connections = True # convert duration into seconds if duration == "": duration = None elif duration.endswith("s") or duration.endswith("S"): duration = duration.strip("s") duration = duration.strip("S") duration = int(duration) elif duration.endswith("m") or duration.endswith("M"): duration = duration.strip("m") duration = duration.strip("M") duration = int(duration) * 60 elif duration.endswith("h") or duration.endswith("H"): duration = duration.strip("h") duration = duration.strip("H") duration = int(duration) * 3600 elif duration.endswith("d") or duration.endswith("D"): duration = duration.strip("d") duration = duration.strip("D") duration = int(duration) * 3600 * 24 controller_services = tuple(config.get("ControllerServices", "CONTROLLER_SERVICE_LIST").split()) compute_services = tuple(config.get("ComputeServices", "COMPUTE_SERVICE_LIST").split()) storage_services = tuple(config.get("StorageServices", "STORAGE_SERVICE_LIST").split()) rabbit_services = tuple(config.get("RabbitmqServices", "RABBITMQ_QUEUE_LIST").split()) common_services = tuple(config.get("CommonServices", "COMMON_SERVICE_LIST").split()) static_svcs = tuple(config.get("StaticServices", "STATIC_SERVICE_LIST").split()) openstack_services = tuple(config.get("OpenStackServices", "OPEN_STACK_SERVICE_LIST").split()) skip_list = tuple(config.get("SkipList", "SKIP_LIST").split()) exclude_list = tuple(config.get("ExcludeList", "EXCLUDE_LIST").split()) # get collection intervals for i in config.options("Intervals"): if config.get("Intervals", i) == "" or config.get("Intervals", i) is None: collection_intervals[i] = None else: collection_intervals[i] = int(config.get("Intervals", i)) # get api-stats services DB_PORT_NUMBER = config.get("ApiStatsConstantPorts", "DB_PORT_NUMBER") RABBIT_PORT_NUMBER = config.get("ApiStatsConstantPorts", "RABBIT_PORT_NUMBER") SERVICES = OrderedDict() SERVICES_INFO = tuple(config.get("ApiStatsServices", "API_STATS_STRUCTURE").split('|')) for service_string in SERVICES_INFO: service_tuple = tuple(service_string.split(';')) if service_tuple[2] != "" and service_tuple[2] is not None: SERVICES[service_tuple[0]] = {'name': service_tuple[1], 'api-port': service_tuple[2]} else: SERVICES[service_tuple[0]] = {'name': service_tuple[1], 'api-port': None} except Exception: print("An error has occurred when parsing the engtools.conf configuration file: {}".format(sys.exc_info())) sys.exit(0) syseng_services = live_svc + static_svcs if cpe_lab is True: services["controller_services"] = controller_services + compute_services + storage_services + common_services else: controller_services += common_services compute_services += common_services storage_services += common_services services["controller_services"] = controller_services services["compute_services"] = compute_services services["storage_services"] = storage_services services["common_services"] = common_services services["syseng_services"] = syseng_services services["rabbit_services"] = rabbit_services influx_info.append(influx_ip) influx_info.append(influx_port) influx_info.append(influx_db) # add config options to log with open("/tmp/livestream.log", "w") as log_file: log_file.write("Configuration for {}:\n".format(node)) log_file.write("-InfluxDB address: {}:{}\n".format(influx_ip, influx_port)) log_file.write("-InfluxDB name: {}\n".format(influx_db)) log_file.write("-CPE lab: {}\n".format(str(cpe_lab))) log_file.write(("-Collect API requests: {}\n".format(str(collect_api_requests)))) log_file.write(("-Collect all services: {}\n".format(str(collect_all_services)))) log_file.write(("-Fast postgres connections: {}\n".format(str(fast_postgres_connections)))) log_file.write(("-Automatic database removal: {}\n".format(str(auto_delete_db)))) if duration is not None: log_file.write("-Live stream duration: {}\n".format(unconverted_duration)) log_file.close() # add POSTROUTING entry to NAT table if cpe_lab is False: # check controller-0 for NAT entry. If not there, add it if node.startswith("controller"): # use first interface if not specified in engtools.conf if external_if == "" or external_if is None: p = Popen("ifconfig", shell=True, stdout=PIPE) external_if = p.stdout.readline().decode().split(":")[0] p.kill() appendToFile("/tmp/livestream.log", "-External interface for {}: {}".format(node, external_if)) # enable IP forwarding p = Popen("sysctl -w net.ipv4.ip_forward=1 > /dev/null", shell=True) p.communicate() p = Popen("iptables -t nat -L --line-numbers", shell=True, stdout=PIPE) tmp = [line.decode().strip("\n") for line in p.stdout.readlines()] # entries need to be removed in reverse order for line in reversed(tmp): formatted_line = " ".join(line.strip("\n").split()[1:]) # if an entry already exists, remove it if formatted_line.startswith("MASQUERADE tcp -- anywhere"): line_number = line.strip("\n").split()[0] p1 = Popen("iptables -t nat -D POSTROUTING {}".format(line_number), shell=True) p1.communicate() p.kill() appendToFile("/tmp/livestream.log", "-Adding NAT information to allow compute/storage nodes to communicate with remote server\n") # add new entry for both InfluxDB and Grafana p = Popen("iptables -t nat -A POSTROUTING -p tcp -o {} -d {} --dport {} -j MASQUERADE".format(external_if, influx_ip, influx_port), shell=True) p.communicate() p = Popen("iptables -t nat -A POSTROUTING -p tcp -o {} -d {} --dport {} -j MASQUERADE".format(external_if, influx_ip, grafana_port), shell=True) p.communicate() appendToFile("/tmp/livestream.log", "\nStarting collection at {}\n".format(datetime.datetime.utcnow())) tasks = [] createDB(influx_info, grafana_port, grafana_api_key) try: node_type = str(node.split("-")[0]) # if not a standard node, run the common functions with collect_all enabled if node_type != "controller" and node_type != "compute" and node_type != "storage": node_type = "common" collect_all_services = True if collection_intervals["memstats"] is not None: p = Process(target=collectMemstats, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], openstack_services, exclude_list, skip_list, collect_all_services), name="memstats") tasks.append(p) p.start() if collection_intervals["schedtop"] is not None: p = Process(target=collectSchedtop, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], openstack_services, exclude_list, skip_list, collect_all_services), name="schedtop") tasks.append(p) p.start() if collection_intervals["filestats"] is not None: p = Process(target=collectFilestats, args=(influx_info, node, collection_intervals, services["{}_services".format(node_type)], services["syseng_services"], exclude_list, skip_list, collect_all_services), name="filestats") tasks.append(p) p.start() if collection_intervals["occtop"] is not None: p = Process(target=collectOcctop, args=(influx_info, node, collection_intervals, getPlatformCores(node, cpe_lab)), name="occtop") tasks.append(p) p.start() if collection_intervals["load_avg"] is not None: p = Process(target=collectLoadavg, args=(influx_info, node, collection_intervals), name="load_avg") tasks.append(p) p.start() if collection_intervals["cpu_count"] is not None: p = Process(target=collectCpuCount, args=(influx_info, node, collection_intervals), name="cpu_count") tasks.append(p) p.start() if collection_intervals["memtop"] is not None: p = Process(target=collectMemtop, args=(influx_info, node, collection_intervals), name="memtop") tasks.append(p) p.start() if collection_intervals["diskstats"] is not None: p = Process(target=collectDiskstats, args=(influx_info, node, collection_intervals), name="diskstats") tasks.append(p) p.start() if collection_intervals["iostat"] is not None: p = Process(target=collectIostat, args=(influx_info, node, collection_intervals), name="iostat") tasks.append(p) p.start() if collection_intervals["netstats"] is not None: p = Process(target=collectNetstats, args=(influx_info, node, collection_intervals), name="netstats") tasks.append(p) p.start() if collect_api_requests is True and node_type == "controller": p = Process(target=collectApiStats, args=(influx_info, node, collection_intervals, SERVICES, DB_PORT_NUMBER, RABBIT_PORT_NUMBER), name="api_requests") tasks.append(p) p.start() if node_type == "controller": if collection_intervals["postgres"] is not None: p = Process(target=collectPostgres, args=(influx_info, node, collection_intervals), name="postgres") tasks.append(p) p.start() p = Process(target=collectPostgresConnections, args=(influx_info, node, collection_intervals, fast_postgres_connections), name="postgres_connections") tasks.append(p) p.start() if collection_intervals["rabbitmq"] is not None: p = Process(target=collectRabbitMq, args=(influx_info, node, collection_intervals), name="rabbitmq") tasks.append(p) p.start() p = Process(target=collectRabbitMqSvc, args=(influx_info, node, collection_intervals, services["rabbit_services"]), name="rabbitmq_svc") tasks.append(p) p.start() if node_type == "compute" or cpe_lab is True: if collection_intervals["vswitch"] is not None: p = Process(target=collectVswitch, args=(influx_info, node, collection_intervals), name="vswitch") tasks.append(p) p.start() print("Sending data to InfluxDB. Please tail /tmp/livestream.log") checkDuration(duration) # give a small delay to ensure services have started time.sleep(3) for t in tasks: os.wait() except KeyboardInterrupt: pass finally: # end here once duration param has ended or ctrl-c is pressed appendToFile("/tmp/livestream.log", "\nEnding collection at {}\n".format(datetime.datetime.utcnow())) if tasks is not None and len(tasks) > 0: killProcesses(tasks) if auto_delete_db is True: deleteDB(influx_info, grafana_port, grafana_api_key) sys.exit(0)