Charles Short 2ba39b30ab Refresh collectd for "train"
This commit does several things at once:

- Use ansible_distribution_major_version to detect which version of the
  EPEL repository. So we dont have to hard code the URL for either epel7
  or epel 8.
- Remove "stein" workaround for colelctd-openstack role. The "stein"
  workaround has been removed in favor of running the collectd daemon
  in a podman container.
- Drop opendaylight support for collectd since it is no longer
  suupported.
- Add the collectd playbook so we can run collectd in a centos 7
  container going forward for "train". This commit still needs
  to be tested on "stein" but it will probably work anyways.
- Add browbeat-containers to tox.ini for flake8
- Simplify detection of docker or podman for older versions of OSP.
(sai)
- Fixed typo from compute_compute to collectd_compute that caused failures on computes
- clear graphite_host in install/group_vars/all.yml
- Move container DockerFiles into brwobeat tree
- Conditionally copy required Dockerfiles to node instead of git clone
- Fix up some log file paths
- Use Docker/Podman depending on release
- Provide single interface(collectd.yml) which has container and baremetal playbooks
- Introduce variable collectd_container in install/group_vars/all
- remove unneeded selinux rebaelling (already running as priveleged) when running container
- remove unneed hostfs mount
- collectd container logs to file instead of STDOUT for easier debug
- add collectd-ping package to collectd-openstack Dockerfile
- Improve docs to reflect changes
- dynamically set rabbitmq and swift paths as well for tail plugin

Co-Authored-By: Sai Sindhur Malleni <smalleni@redhat.com>

Change-Id: I627a696f6f1240d96a0e1d85c26d59bbbfae2b1b
Signed-off-by: Charles Short <chucks@redhat.com>
Signed-off-by: Sai Sindhur Malleni <smalleni@redhat.com>
2019-11-05 08:08:37 -05:00

335 lines
14 KiB
Python

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Collectd python plugin to read ceph storage stats from ceph command line for
an OpenStack Cloud.
"""
import collectd
import json
import os
import subprocess
import time
import traceback
class CollectdCephStorage(object):
def __init__(self):
self.ceph_cluster = None
self.ceph_rados_bench = False
self.ceph_rados_bench_interval = 60
self.ceph_mon_stats = False
self.ceph_mon_stats_interval = 10
self.ceph_osd_stats = False
self.ceph_osd_stats_interval = 10
self.ceph_pg_stats = False
self.ceph_pg_stats_interval = 10
self.ceph_pool_stats = False
self.ceph_pool_stats_interval = 10
def configure_callback(self, config):
for node in config.children:
val = str(node.values[0])
if node.key == 'CephRadosBench':
self.ceph_rados_bench = val in ['True', 'true']
elif node.key == 'CephMONStats':
self.ceph_mon_stats = val in ['True', 'true']
elif node.key == 'CephOSDStats':
self.ceph_osd_stats = val in ['True', 'true']
elif node.key == 'CephPGStats':
self.ceph_pg_stats = val in ['True', 'true']
elif node.key == 'CephPoolStats':
self.ceph_pool_stats = val in ['True', 'true']
elif node.key == 'CephCluster':
self.ceph_cluster = val
elif node.key == 'CephRadosBenchInterval':
self.ceph_rados_bench_interval = int(float(val))
elif node.key == 'CephMONStatsInterval':
self.ceph_mon_stats_interval = int(float(val))
elif node.key == 'CephOSDStatsInterval':
self.ceph_osd_stats_interval = int(float(val))
elif node.key == 'CephPGStatsInterval':
self.ceph_pg_stats_interval = int(float(val))
elif node.key == 'CephPoolStatsInterval':
self.ceph_pool_stats_interval = int(float(val))
else:
collectd.warning(
'collectd-ceph-storage: Unknown config key: {}'
.format(node.key))
if not self.ceph_cluster:
collectd.warning('collectd-ceph-storage: CephCluster Undefined')
if self.ceph_rados_bench:
collectd.info('Registered Ceph Rados Bench')
collectd.register_read(
self.read_ceph_rados_bench,
self.ceph_rados_bench_interval, name='ceph-rados-bench')
if self.ceph_mon_stats:
collectd.info('Registered Ceph Mon')
collectd.register_read(
self.read_ceph_mon, self.ceph_mon_stats_interval,
name='ceph-monitor')
if self.ceph_osd_stats:
collectd.info('Registered Ceph OSD')
collectd.register_read(
self.read_ceph_osd, self.ceph_osd_stats_interval,
name='ceph-osd')
if self.ceph_pg_stats:
collectd.info('Registered Ceph PG')
collectd.register_read(
self.read_ceph_pg, self.ceph_pg_stats_interval, name='ceph-pg')
if self.ceph_pool_stats:
collectd.info('Registered Ceph Pool')
collectd.register_read(
self.read_ceph_pool, self.ceph_pool_stats_interval,
name='ceph-pool')
def dispatch_value(self, plugin_instance, type_instance, value, interval):
metric = collectd.Values()
metric.plugin = 'collectd-ceph-storage'
metric.interval = interval
metric.type = 'gauge'
metric.plugin_instance = plugin_instance
metric.type_instance = type_instance
metric.values = [value]
metric.dispatch()
def read_ceph_rados_bench(self):
"""Runs "rados bench" and collects latencies reported."""
rados_bench_ran, output = self.run_command(
['timeout', '30s', 'rados', '-p', 'rbd', 'bench', '10',
'write', '-t', '1', '-b', '65536', '2>/dev/null', '|',
'grep', '-i', 'latency', '|', 'awk',
'\'{print 1000*$3}\''], False)
if rados_bench_ran:
results = output.split('\n')
self.dispatch_value(
'cluster', 'avg_latency', results[0],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'stddev_latency', results[1],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'max_latency', results[2],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'min_latency', results[3],
self.ceph_rados_bench_interval)
def read_ceph_mon(self):
"""Reads stats from "ceph mon dump" command."""
mon_dump_ran, output = self.run_command(
['ceph', 'mon', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if mon_dump_ran:
json_data = json.loads(output)
self.dispatch_value(
'mon', 'number', len(json_data['mons']),
self.ceph_mon_stats_interval)
self.dispatch_value(
'mon', 'quorum', len(json_data['quorum']),
self.ceph_mon_stats_interval)
def read_ceph_osd(self):
"""Reads stats from "ceph osd dump" command."""
osd_dump_ran, output = self.run_command(
['ceph', 'osd', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if osd_dump_ran:
json_data = json.loads(output)
self.dispatch_value(
'pool', 'number', len(json_data['pools']),
self.ceph_osd_stats_interval)
for pool in json_data['pools']:
pool_name = 'pool-{}'.format(pool['pool_name'])
self.dispatch_value(
pool_name, 'size', pool['size'],
self.ceph_osd_stats_interval)
self.dispatch_value(
pool_name, 'pg_num', pool['pg_num'],
self.ceph_osd_stats_interval)
self.dispatch_value(
pool_name, 'pgp_num', pool['pg_placement_num'],
self.ceph_osd_stats_interval)
osds_up = 0
osds_down = 0
osds_in = 0
osds_out = 0
for osd in json_data['osds']:
if osd['up'] == 1:
osds_up += 1
else:
osds_down += 1
if osd['in'] == 1:
osds_in += 1
else:
osds_out += 1
self.dispatch_value(
'osd', 'up', osds_up, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'down', osds_down, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'in', osds_in, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'out', osds_out, self.ceph_osd_stats_interval)
def read_ceph_pg(self):
"""Reads stats from "ceph pg dump" command."""
pg_dump_ran, output = self.run_command(
['ceph', 'pg', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if pg_dump_ran:
json_data = json.loads(output)
pgs = {}
for pg in json_data['pg_stats']:
for state in pg['state'].split('+'):
if state not in pgs:
pgs[state] = 0
pgs[state] += 1
for state in pgs:
self.dispatch_value(
'pg', state, pgs[state], self.ceph_pg_stats_interval)
for osd in json_data['osd_stats']:
osd_id = 'osd-{}'.format(osd['osd'])
self.dispatch_value(
osd_id, 'kb_used', osd['kb_used'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'kb_total', osd['kb'], self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'snap_trim_queue_len', osd['snap_trim_queue_len'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'num_snap_trimming', osd['num_snap_trimming'],
self.ceph_pg_stats_interval)
if 'fs_perf_stat' in osd:
self.dispatch_value(
osd_id, 'apply_latency_ms',
osd['fs_perf_stat']['apply_latency_ms'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'commit_latency_ms',
osd['fs_perf_stat']['commit_latency_ms'],
self.ceph_pg_stats_interval)
elif 'perf_stat' in osd:
self.dispatch_value(
osd_id, 'apply_latency_ms',
osd['perf_stat']['apply_latency_ms'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'commit_latency_ms',
osd['perf_stat']['commit_latency_ms'],
self.ceph_pg_stats_interval)
def read_ceph_pool(self):
"""Reads stats from "ceph osd pool" and "ceph df" commands."""
stats_ran, stats_output = self.run_command(
['ceph', 'osd', 'pool', 'stats', '-f', 'json'])
df_ran, df_output = self.run_command(['ceph', 'df', '-f', 'json'])
if stats_ran:
json_stats_data = json.loads(stats_output)
for pool in json_stats_data:
pool_key = 'pool-{}'.format(pool['pool_name'])
for stat in (
'read_bytes_sec', 'write_bytes_sec', 'read_op_per_sec',
'write_op_per_sec'):
value = 0
if stat in pool['client_io_rate']:
value = pool['client_io_rate'][stat]
self.dispatch_value(
pool_key, stat, value, self.ceph_pool_stats_interval)
if df_ran:
json_df_data = json.loads(df_output)
for pool in json_df_data['pools']:
pool_key = 'pool-{}'.format(pool['name'])
for stat in ('bytes_used', 'kb_used', 'objects'):
value = pool['stats'][stat] if stat in pool['stats'] else 0
self.dispatch_value(
pool_key, stat, value, self.ceph_pool_stats_interval)
if 'total_bytes' in json_df_data['stats']:
# ceph 0.84+
self.dispatch_value(
'cluster', 'total_space',
int(json_df_data['stats']['total_bytes']),
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_used',
int(json_df_data['stats']['total_used_bytes']),
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_avail',
int(json_df_data['stats']['total_avail_bytes']),
self.ceph_pool_stats_interval)
else:
# ceph < 0.84
self.dispatch_value(
'cluster', 'total_space',
int(json_df_data['stats']['total_space']) * 1024.0,
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_used',
int(json_df_data['stats']['total_used']) * 1024.0,
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_avail',
int(json_df_data['stats']['total_avail']) * 1024.0,
self.ceph_pool_stats_interval)
def run_command(self, command, check_output=True):
"""Run a command for this collectd plugin. Returns a tuple with command
success and output or False and None for output.
"""
output = None
try:
if check_output:
output = subprocess.check_output(command)
else:
stdin, stdout, stderr = os.popen3(' '.join(command))
output = stdout.read()
except Exception as exc:
collectd.error(
'collectd-ceph-storage: {} exception: {}'.format(command, exc))
collectd.error(
'collectd-ceph-storage: {} traceback: {}'
.format(command, traceback.format_exc()))
return False, None
if output is None:
collectd.error(
'collectd-ceph-storage: failed to {}: output is None'
.format(command))
return False, None
return True, output
collectd_ceph_storage = CollectdCephStorage()
collectd.register_config(collectd_ceph_storage.configure_callback)