c83599d45b
Change-Id: I160580f4f5f4ef7dd9cfdb1fc887a1fce8e2c4d2
383 lines
13 KiB
Python
383 lines
13 KiB
Python
# coding=utf-8
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import math
|
|
|
|
from interval import interval
|
|
import numpy as np
|
|
from scipy import stats
|
|
from sklearn import cluster as skl
|
|
|
|
from rally_runners.reliability import types
|
|
|
|
MIN_CLUSTER_WIDTH = 3 # filter cluster with less items
|
|
MAX_CLUSTER_GAP = 6 # max allowed gap in the cluster (otherwise split them)
|
|
WINDOW_SIZE = 21 # window size for average duration calculation
|
|
WARM_UP_CUTOFF = 10 # drop first N points from etalon
|
|
DEGRADATION_THRESHOLD = 4 # how many sigmas duration differs from etalon mean
|
|
|
|
|
|
def find_clusters(arr, filter_fn, max_gap=MAX_CLUSTER_GAP,
|
|
min_cluster_width=MIN_CLUSTER_WIDTH):
|
|
"""Find clusters of 1 in the sequence containing (0, 1)
|
|
|
|
The given array is filtered through filter_fn function which produces
|
|
sequence of 0s or 1s. Then 1s are grouped into clusters so that:
|
|
* there can not be more than max_gap 0s inside
|
|
* there are at least min_cluster_width of 1s
|
|
|
|
:param arr: initial array
|
|
:param filter_fn: transformation x -> [0, 1]
|
|
:param max_gap: maximum allowed number of consequent 0s inside the cluster
|
|
:param min_cluster_width: minimum cluster width
|
|
:return: multi-interval (i.e. list of intervals)
|
|
"""
|
|
clusters = interval()
|
|
|
|
start = None
|
|
end = None
|
|
|
|
for i, y in enumerate(arr):
|
|
v = filter_fn(y)
|
|
if v:
|
|
if not start:
|
|
start = i
|
|
end = i
|
|
else:
|
|
if end and i - end > max_gap:
|
|
if end - start >= min_cluster_width:
|
|
clusters |= interval([start, end])
|
|
start = end = None
|
|
|
|
if end:
|
|
if end - start >= MIN_CLUSTER_WIDTH:
|
|
clusters |= interval([start, end])
|
|
|
|
return clusters
|
|
|
|
|
|
def convert_rally_data(data):
|
|
"""Convert raw Rally data into [DataRow]
|
|
|
|
:param data: raw Rally data
|
|
:return: ([DataRow], index of hook)
|
|
"""
|
|
results = data['result']
|
|
start = results[0]['timestamp'] # start of the run
|
|
|
|
hooks = data['hooks']
|
|
hook_index = 0
|
|
|
|
if hooks:
|
|
# when the hook started
|
|
hook_start_time = hooks[0]['started_at'] - start
|
|
else:
|
|
# let all data be etalon
|
|
hook_start_time = results[-1]['timestamp']
|
|
|
|
table = []
|
|
for index, result in enumerate(results):
|
|
time = result['timestamp'] - start
|
|
duration = result['duration']
|
|
|
|
if time + duration < hook_start_time:
|
|
hook_index = index
|
|
|
|
table.append(types.DataRow(index=index, time=time, duration=duration,
|
|
error=bool(result['error'])))
|
|
|
|
return table, hook_index
|
|
|
|
|
|
def calculate_array_stats(data):
|
|
data = np.array(data)
|
|
return types.ArrayStats(mean=np.mean(data), median=np.median(data),
|
|
p95=np.percentile(data, 95), var=np.var(data),
|
|
std=np.std(data), count=len(data))
|
|
|
|
|
|
def indexed_interval_to_time_interval(table, src_interval):
|
|
"""For given indexes in the table return time interval
|
|
|
|
:param table: [DataRow] source data
|
|
:param src_interval: interval of array indexes
|
|
:return: ClusterStats
|
|
"""
|
|
start_index = int(src_interval.inf)
|
|
end_index = int(src_interval.sup)
|
|
|
|
if start_index > 0:
|
|
d_start = (table[start_index].time - table[start_index - 1].time) / 2
|
|
else:
|
|
d_start = 0
|
|
|
|
if end_index < len(table) - 1:
|
|
d_end = (table[end_index + 1].time - table[end_index].time) / 2
|
|
else:
|
|
d_end = 0
|
|
|
|
start_time = table[start_index].time - d_start
|
|
end_time = table[end_index].time + d_end
|
|
var = d_start + d_end
|
|
duration = end_time - start_time
|
|
count = sum(1 if start_time <= p.time <= end_time else 0 for p in table)
|
|
|
|
return types.ClusterStats(start=start_time, end=end_time, count=count,
|
|
duration=types.MeanVar(duration, var))
|
|
|
|
|
|
def calculate_error_area(table):
|
|
"""Calculates error statistics
|
|
|
|
:param table:
|
|
:return: list of time intervals where errors occur
|
|
"""
|
|
error_clusters = find_clusters(
|
|
(p.error for p in table),
|
|
filter_fn=lambda x: 1 if x else 0,
|
|
min_cluster_width=0
|
|
)
|
|
error_stats = [indexed_interval_to_time_interval(table, cluster)
|
|
for cluster in error_clusters]
|
|
return error_stats
|
|
|
|
|
|
def calculate_anomaly_area(table, quantile=0.9):
|
|
"""Find anomalies
|
|
|
|
:param quantile: float, default 0.3
|
|
:param table:
|
|
:return: list of time intervals where anomalies occur
|
|
"""
|
|
table = [p for p in table if not p.error] # rm errors
|
|
x = [p.duration for p in table]
|
|
X = np.array(zip(x, np.zeros(len(x))), dtype=np.float)
|
|
bandwidth = skl.estimate_bandwidth(X, quantile=quantile)
|
|
mean_shift_algo = skl.MeanShift(bandwidth=bandwidth, bin_seeding=True)
|
|
mean_shift_algo.fit(X)
|
|
labels = mean_shift_algo.labels_
|
|
lm = stats.mode(labels)
|
|
|
|
# filter out the largest cluster
|
|
vl = [(0 if labels[i] == lm.mode else 1) for i, p in enumerate(x)]
|
|
|
|
anomaly_clusters = find_clusters(vl, filter_fn=lambda y: y)
|
|
anomaly_stats = [indexed_interval_to_time_interval(table, cluster)
|
|
for cluster in anomaly_clusters]
|
|
return anomaly_stats
|
|
|
|
|
|
def calculate_smooth_data(table, window_size):
|
|
"""Calculate mean for the data
|
|
|
|
:param table:
|
|
:param window_size:
|
|
:return: list of points in mean data
|
|
"""
|
|
table = [p for p in table if not p.error] # rm errors
|
|
smooth = []
|
|
|
|
for i in range(0, len(table) - window_size):
|
|
durations = [p.duration for p in table[i: i + window_size]]
|
|
|
|
time = np.mean([p.time for p in table[i: i + window_size]])
|
|
duration = np.mean(durations)
|
|
var = abs(time - np.mean(
|
|
[p.time for p in table[i + 1: i + window_size - 1]]))
|
|
|
|
smooth.append(types.SmoothData(time=time, duration=duration, var=var))
|
|
|
|
return smooth
|
|
|
|
|
|
def calculate_degradation_area(table, smooth, etalon_stats, etalon_threshold):
|
|
table = [p for p in table if not p.error] # rm errors
|
|
if len(table) <= WINDOW_SIZE:
|
|
return []
|
|
|
|
mean_times = [p.time for p in smooth]
|
|
mean_durations = [p.duration for p in smooth]
|
|
mean_vars = [p.var for p in smooth]
|
|
|
|
clusters = find_clusters(
|
|
mean_durations,
|
|
filter_fn=lambda y: 0 if abs(y) < etalon_threshold else 1)
|
|
|
|
# calculate cluster duration
|
|
degradation_cluster_stats = []
|
|
for cluster in clusters:
|
|
start_idx = int(cluster.inf)
|
|
end_idx = int(cluster.sup)
|
|
start_time = mean_times[start_idx]
|
|
end_time = mean_times[end_idx]
|
|
duration = end_time - start_time
|
|
var = np.mean(mean_vars[start_idx: end_idx])
|
|
|
|
# point durations
|
|
point_durations = []
|
|
for p in table:
|
|
if start_time < p.time < end_time:
|
|
point_durations.append(p.duration)
|
|
|
|
# calculate difference between means
|
|
# http://onlinestatbook.com/2/tests_of_means/difference_means.html
|
|
anomaly_mean = np.mean(point_durations)
|
|
anomaly_var = np.var(point_durations)
|
|
se = math.sqrt(anomaly_var / len(point_durations) +
|
|
etalon_stats.var / etalon_stats.count)
|
|
dof = etalon_stats.count + len(point_durations) - 2
|
|
mean_diff = anomaly_mean - etalon_stats.mean
|
|
conf_interval = stats.t.interval(0.95, dof, loc=mean_diff, scale=se)
|
|
|
|
degradation = types.MeanVar(
|
|
mean_diff, np.mean([mean_diff - conf_interval[0],
|
|
conf_interval[1] - mean_diff]))
|
|
degradation_ratio = types.MeanVar(
|
|
anomaly_mean / etalon_stats.mean,
|
|
np.mean([(mean_diff - conf_interval[0]) / etalon_stats.mean,
|
|
(conf_interval[1] - mean_diff) / etalon_stats.mean]))
|
|
|
|
logging.debug('Mean diff: %s' % mean_diff)
|
|
logging.debug('Conf int: %s' % str(conf_interval))
|
|
|
|
degradation_cluster_stats.append(types.DegradationClusterStats(
|
|
start=start_time, end=end_time,
|
|
duration=types.MeanVar(duration, var),
|
|
degradation=degradation, degradation_ratio=degradation_ratio,
|
|
count=len(point_durations)
|
|
))
|
|
|
|
return degradation_cluster_stats
|
|
|
|
|
|
def process_one_run(rally_data):
|
|
"""Process single Rally run (raw output for single task iteration)
|
|
|
|
This function calculates statistics for a single run, including
|
|
baseline stats (etalon), error stats, anomalies and areas with degraded
|
|
performance.
|
|
|
|
:param rally_data: raw Rally data
|
|
:return: RunResult
|
|
"""
|
|
data, hook_index = convert_rally_data(rally_data)
|
|
etalon = [p.duration for p in data[WARM_UP_CUTOFF:hook_index]]
|
|
|
|
etalon_stats = calculate_array_stats(etalon)
|
|
etalon_threshold = abs(etalon_stats.mean +
|
|
DEGRADATION_THRESHOLD * etalon_stats.std)
|
|
etalon_interval = interval([data[WARM_UP_CUTOFF].time,
|
|
data[hook_index].time])[0]
|
|
|
|
logging.debug('Hook index: %s' % hook_index)
|
|
logging.debug('Etalon stats: %s' % str(etalon_stats))
|
|
|
|
# Calculate stats
|
|
error_area = calculate_error_area(data)
|
|
|
|
anomaly_area = calculate_anomaly_area(data)
|
|
|
|
smooth_data = calculate_smooth_data(data, window_size=WINDOW_SIZE)
|
|
|
|
degradation_area = calculate_degradation_area(
|
|
data, smooth_data, etalon_stats, etalon_threshold)
|
|
|
|
# logging.debug stats
|
|
logging.debug('Error area: %s' % error_area)
|
|
logging.debug('Anomaly area: %s' % anomaly_area)
|
|
logging.debug('Degradation area: %s' % degradation_area)
|
|
|
|
return types.RunResult(
|
|
data=data,
|
|
error_area=error_area,
|
|
anomaly_area=anomaly_area,
|
|
degradation_area=degradation_area,
|
|
etalon_stats=etalon_stats,
|
|
etalon_interval=etalon_interval,
|
|
etalon_threshold=etalon_threshold,
|
|
smooth_data=smooth_data,
|
|
)
|
|
|
|
|
|
def process_all_runs(runs):
|
|
"""Process all runs from Rally raw data report
|
|
|
|
This function returns summary stats for all runs, including downtime
|
|
duration, MTTR, performance degradation.
|
|
|
|
:param runs: collection of Rally runs
|
|
:return: SummaryResult
|
|
"""
|
|
run_results = []
|
|
downtime_statistic = []
|
|
downtime_var = []
|
|
ttr_statistic = []
|
|
ttr_var = []
|
|
degradation_statistic = []
|
|
degradation_var = []
|
|
degradation_ratio_statistic = []
|
|
degradation_ratio_var = []
|
|
|
|
for i, one_run in enumerate(runs):
|
|
run_result = process_one_run(one_run)
|
|
run_results.append(run_result)
|
|
|
|
ds = 0
|
|
for index, stat in enumerate(run_result.error_area):
|
|
ds += stat.duration.statistic
|
|
downtime_var.append(stat.duration.var)
|
|
|
|
if run_result.error_area:
|
|
downtime_statistic.append(ds)
|
|
|
|
ts = ss = sr = 0
|
|
for index, stat in enumerate(run_result.degradation_area):
|
|
ts += stat.duration.statistic
|
|
ttr_var.append(stat.duration.var)
|
|
ss += stat.degradation.statistic
|
|
degradation_var.append(stat.degradation.var)
|
|
sr += stat.degradation_ratio.statistic
|
|
degradation_ratio_var.append(stat.degradation_ratio.var)
|
|
|
|
if run_result.degradation_area:
|
|
ttr_statistic.append(ts)
|
|
degradation_statistic.append(ss)
|
|
degradation_ratio_statistic.append(sr)
|
|
|
|
downtime = None
|
|
if downtime_statistic:
|
|
downtime_mean = np.mean(downtime_statistic)
|
|
se = math.sqrt((sum(downtime_var) +
|
|
np.var(downtime_statistic)) / len(downtime_statistic))
|
|
downtime = types.MeanVar(downtime_mean, se)
|
|
mttr = None
|
|
if ttr_statistic:
|
|
ttr_mean = np.mean(ttr_statistic)
|
|
se = math.sqrt((sum(ttr_var) +
|
|
np.var(ttr_statistic)) / len(ttr_statistic))
|
|
mttr = types.MeanVar(ttr_mean, se)
|
|
degradation = None
|
|
degradation_ratio = None
|
|
if degradation_statistic:
|
|
degradation = types.MeanVar(np.mean(degradation_statistic),
|
|
np.mean(degradation_var))
|
|
degradation_ratio = types.MeanVar(np.mean(degradation_ratio_statistic),
|
|
np.mean(degradation_ratio_var))
|
|
|
|
return types.SummaryResult(run_results=run_results, mttr=mttr,
|
|
degradation=degradation,
|
|
degradation_ratio=degradation_ratio,
|
|
downtime=downtime)
|