Sergey Galkin 1cc90a4123 Add scripts for mesos marathon performance testing
This is a draft

1. application_managment_helper.py is helper to create or delete apps in marathon
2. marathon-scale-tests.py is scripts for testing marathon and mesos
3. run_full_tests.sh is script for run all tests and generate json with results
4. result-analize.py is script for generate RST report with table and graphs

Change-Id: Ic7dc472c90bf5746c3c2b364afc694c83b8509b7
2016-03-31 14:16:41 +03:00

493 lines
18 KiB
Python

import argparse
import hashlib
import json
import logging
import random
import signal
import statistics
import sys
import time
from marathon import MarathonClient
from marathon.models.container import MarathonContainer
from marathon.models.container import MarathonContainerPortMapping
from marathon.models.container import MarathonDockerContainer
from marathon.models import MarathonApp
from marathon.models import MarathonHealthCheck
from multiprocessing import Pool
MEM = 256
CPUS = 1
DISK = 50
def percentage(part, whole):
return 100 * float(part)/float(whole)
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def is_timeout(start_time, timeout):
logging.debug("[is_timeout] start timeout is {}".format(start_time))
if time.time() - start_time > timeout:
logging.warning("Timeout")
return True
else:
return False
def check_instances(check, app_instances, app_name, start_time, timeout,
ready_tasks=None):
in_progress = 0
while in_progress < app_instances:
in_progress = 0
tasks = cluster.list_tasks(app_name)
for task in tasks:
if check == "task":
if ready_tasks is not None:
app_index = -1
for ready_task in ready_tasks:
app_index += 1
if ready_task.id != task.id:
break
if (task.started_at is not None and
task.started_at !=
ready_tasks[app_index].started_at):
logging.info("[check_instances] Task started "
"at {}".format(task.started_at))
in_progress += 1
else:
if task.started_at is not None:
logging.info("[check_instances] Task started "
"at {}".format(task.started_at))
in_progress += 1
elif check == "health":
if task.health_check_results:
in_progress += 1
if is_timeout(start_time, timeout):
return in_progress
return in_progress
def delete_app(app_name, force=False):
cluster.delete_app(app_id=app_name, force=force)
def calculate_results_per_operation(results):
return {"app_name": results[0],
"instances": results[1],
"successful_instances": results[2],
"successful_instance_percent": percentage(results[2], results[1]),
"app_full_time": round(results[3], 2),
"instances_mean": round(statistics.mean(results[4]), 2),
"instances_median": round(statistics.median(results[4]), 2),
"instances_min": round(min(results[4]), 2),
"instances_max": round(max(results[4]), 2)}
def check_in_deployment(app_name, timeout):
deployments = 1
start_time = time.time()
while deployments != 0 and not is_timeout(start_time, timeout):
for app in cluster.list_apps():
if app.id == "/{}".format(app_name):
deployments = len(app.deployments)
return None
def check_operation_status(start_time, app_name, app_instances,
timeout, ready_tasks=None):
successful_instances = check_instances(
"task", app_instances, app_name, start_time, timeout, ready_tasks)
all_starting = []
tasks = cluster.list_tasks(app_name)
for task in tasks:
logging.info("[check_operation_status] Task started at ="
" {}".format(task.started_at))
logging.debug("[check_operation_status] {} - {} ".format(
task.started_at, task.staged_at))
if task.started_at is not None:
starting = task.started_at - task.staged_at
all_starting.append(starting.total_seconds())
if len(all_starting) == 0:
all_starting = [0]
check_in_deployment(app_name, timeout)
logging.debug("[check_operation_status] start time is {}".format(
start_time))
app_full_time = time.time() - start_time
return successful_instances, all_starting, app_full_time
def restart_and_wait_app(app_name):
timeout = 600
list_tasks = cluster.list_tasks(app_name)
app_instances = len(list_tasks)
start_time = time.time()
cluster.restart_app(app_id=app_name)
time.sleep(5)
successful_instances, all_starting, app_full_time = \
check_operation_status(start_time, app_name, app_instances,
timeout, list_tasks)
return calculate_results_per_operation([app_name, app_instances,
successful_instances,
app_full_time,
all_starting])
def update_and_wait_cpu(app_name):
return update_and_wait_app(app_name, "cpu", "2")
def update_and_wait_mem(app_name):
return update_and_wait_app(app_name, "mem", "2")
def update_and_wait_disk(app_name):
return update_and_wait_app(app_name, "disk", "2")
def update_and_wait_instances(app_name):
return update_and_wait_app(app_name, "instances", "2")
def update_and_wait_app(app_name, scale_param, scale, scale_type="*"):
timeout = 600
list_tasks = cluster.list_tasks(app_name)
app_instances = len(list_tasks)
cpus = CPUS
mem = MEM
disk = DISK
instances = app_instances
if scale_param == "cpu":
cpus = eval("{} {} {}".format(cpus, scale_type, scale))
if scale_param == "mem":
mem = eval("{} {} {}".format(mem, scale_type, scale))
if scale_param == "disk":
disk = eval("{} {} {}".format(disk, scale_type, scale))
if scale_param == "instances":
instances = eval("{} {} {}".format(instances, scale_type, scale))
updated_app = MarathonApp(cpus=cpus, mem=mem, disk=disk,
instances=instances)
start_time = time.time()
cluster.update_app(app_id=app_name, app=updated_app)
time.sleep(5)
successful_instances, all_starting, app_full_time = \
check_operation_status(start_time, app_name, instances,
timeout, list_tasks)
return calculate_results_per_operation([app_name, app_instances,
successful_instances,
app_full_time,
all_starting])
def delete_and_wait_app(app_name):
timeout = 600
start_time = time.time()
try:
cluster.delete_app(app_id=app_name)
while (len(cluster.list_apps()) > 0 and
time.time() - start_time < timeout):
time.sleep(0.01)
logging.debug("[delete_and_wait_app] start time is {}".format(
start_time))
end_time = time.time() - start_time
return {"app_name": app_name,
"delete_time": round(end_time, 2)
}
except BaseException as ex:
logging.error(ex)
return {"app_name": app_name,
"delete_time": None
}
def create_and_delete_app(app_instances):
return create_app(app_instances, delete=True)
def create_several_apps(apps_amount, instances_amount):
all_apps = []
if not args.silent:
print("======= Creating {} applications, with {} instances "
"===============".format(apps_amount, instances_amount))
sys.stdout.write('Creating apps: ')
for count in range(apps_amount):
if not args.silent:
if count % 10:
sys.stdout.write('.')
sys.stdout.flush()
else:
sys.stdout.write(str(count))
sys.stdout.flush()
all_apps.append(
create_app(app_instances=instances_amount,
need_statistics=False)["app_name"])
if not args.silent:
print(str(apps_amount))
return all_apps
def create_app(app_instances, delete=False,
timeout=1200, need_statistics=True):
port_mapping = MarathonContainerPortMapping(container_port=80,
protocol="tcp")
app_docker = MarathonDockerContainer(
image="nginx",
network="BRIDGE",
port_mappings=[port_mapping])
app_container = MarathonContainer(docker=app_docker)
http_health_check = MarathonHealthCheck(protocol="HTTP",
path="/",
grace_period_seconds=300,
interval_seconds=2,
timeout_seconds=20,
max_consecutive_failures=3)
app_name = str(hashlib.md5(str(random.random())).hexdigest())
logging.debug("Create cluster {}".format(app_name))
new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK,
container=app_container,
health_checks=[http_health_check],
instances=app_instances,
max_launch_delay_seconds=5)
start_time = time.time()
cluster.create_app(app_id=app_name,
app=new_app)
logging.debug("Get tasks for cluster {}".format(app_name))
successful_instances, all_starting, app_full_time = \
check_operation_status(start_time, app_name, app_instances, timeout)
if delete:
logging.debug('Delete {}'.format(app_name))
delete_app(app_name, force=True)
if need_statistics:
return {"app_name": app_name,
"app_full_time": round(app_full_time, 2),
"instances": app_instances,
"successful_instances": successful_instances,
"instances_mean": round(statistics.mean(all_starting), 2),
"instances_median": round(statistics.median(all_starting), 2),
"instances_min": round(min(all_starting), 2),
"instances_max": round(max(all_starting), 2),
"id_run": id_run}
else:
return {"app_name": app_name}
def concur_operations(function, arguments, concurrency):
pool = Pool(concurrency, init_worker)
results = []
try:
results = eval("pool.map({}, {})".format(function, arguments))
pool.close()
pool.join()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
pool.terminate()
pool.join()
return results
def concur_create_apps(concurrency, instances):
if not args.silent:
print("======= Creating applications with {} instances, "
"concurrency is {} "
"===============".format(instances, concurrency))
list_instances = [instances] * concurrency
return concur_operations("create_and_delete_app", str(list_instances),
concurrency)
def concur_restart_apps(concurrency, instances):
apps = create_several_apps(concurrency, instances)
if not args.silent:
print("======= Restart applications with {} instances , "
"concurrency is {} "
"===============".format(instances, concurrency))
results = concur_operations("restart_and_wait_app", str(apps), concurrency)
for app in apps:
delete_app(app, True)
return results
def concur_update_app(update_type, concurrency, instances):
apps = create_several_apps(concurrency, instances)
if not args.silent:
print("======= Update applications, concurrency is {} "
"===============".format(concurrency))
results = []
if update_type == "cpu":
results = concur_operations("update_and_wait_cpu", str(apps),
concurrency)
if update_type == "mem":
results = concur_operations("update_and_wait_mem", str(apps),
concurrency)
if update_type == "disk":
results = concur_operations("update_and_wait_disk", str(apps),
concurrency)
if update_type == "instances":
results = concur_operations("update_and_wait_instances", str(apps),
concurrency)
for app in apps:
delete_app(app, True)
return results
def concur_delete_apps(concurrency, instances):
apps = create_several_apps(concurrency, instances)
if not args.silent:
print("======= Delete applications with {}, concurrency is {} "
"===============".format(instances, concurrency))
results = concur_operations("delete_and_wait_app", str(apps), concurrency)
return results
def calculate_summary(results):
if len(results[0]) == 10:
max_tmp = []
min_tmp = []
successful_tmp = []
mean_tmp = []
median_tmp = []
app_full_time_tmp = []
for result in results:
max_tmp.append(result["instances_max"])
min_tmp.append(result["instances_min"])
successful_tmp.append(result["successful_instances"])
mean_tmp.append(result["instances_mean"])
median_tmp.append(result["instances_median"])
app_full_time_tmp.append(result["app_full_time"])
sum_result = {
"type": "summary",
"instances_max": max(max_tmp),
"instances_min": min(min_tmp),
"instances_mean": round(statistics.mean(mean_tmp), 2),
"instances_median": statistics.median(median_tmp),
"app_full_time_max": max(app_full_time_tmp),
"app_full_time_min": min(app_full_time_tmp),
"app_full_time_mean": round(statistics.mean(app_full_time_tmp), 2),
"app_full_time_median": statistics.median(app_full_time_tmp),
"test": args.tests,
"concurrency": args.concurrency,
"nodes": args.nodes,
"id_run": id_run
}
else:
time_tmp = []
delete_fails = 0
for result in results:
if result["delete_time"] is not None:
time_tmp.append(result["delete_time"])
else:
delete_fails += 1
sum_result = {
"type": "summary",
"delete_time_max": max(time_tmp),
"delete_time_min": min(time_tmp),
"delete_time_mean": round(statistics.mean(time_tmp), 2),
"delete_time_median": round(statistics.median(time_tmp), 2),
"delete_fails": delete_fails,
"test": args.tests,
"concurrency": args.concurrency,
"nodes": args.nodes,
"id_run": id_run
}
return sum_result
def print_results(results):
full_results = []
for result in results:
result["type"] = "single"
full_results.append(result)
full_results.append(calculate_summary(results))
if args.only_summary:
for result in full_results:
if result["type"] == "summary":
full_results = [result]
if args.pretty_output:
print(json.dumps(full_results, sort_keys=True,
indent=4, separators=(',', ': ')))
else:
print("{},".format(json.dumps(full_results, sort_keys=True)))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--tests", help="Tests",
choices=['all', 'create', 'update_cpu',
'update_mem', 'update_disk',
'update_instances',
'restart', 'delete'], required=True)
parser.add_argument("-m", "--marathon",
help="Marathon URL, on example "
"http://172.20.8.34:8080/virt-env-2/marathon",
required=True)
parser.add_argument("-c", "--concurrency",
help="Concurrency",
required=True)
parser.add_argument("-n", "--nodes",
help="Number of tasks per application",
required=True)
parser.add_argument("-l", "--log_level", help="logging level",
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR',
'CRITICAL'])
parser.add_argument("-s", "--silent",
help="Print only results",
action="store_true")
parser.add_argument("-o", "--only_summary",
help="Print only summary results",
action="store_true")
parser.add_argument("-p", "--pretty_output",
help="Pretty json outpur",
action="store_true")
args = parser.parse_args()
cluster = MarathonClient(args.marathon)
id_run = str(hashlib.md5(str(random.random())).hexdigest())
if args.log_level is None:
log_level = logging.DEBUG
else:
log_level = eval("logging.{}".format(args.log_level))
logging.basicConfig(
filename="tests-debug.log",
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if args.tests == "create":
print_results(concur_create_apps(int(args.concurrency),
int(args.nodes)))
if args.tests == "restart":
print_results(concur_restart_apps(int(args.concurrency),
int(args.nodes)))
if args.tests == "update_cpu":
print_results(concur_update_app("cpu",
int(args.concurrency),
int(args.nodes)))
if args.tests == "update_mem":
print_results(concur_update_app("mem",
int(args.concurrency),
int(args.nodes)))
if args.tests == "update_disk":
print_results(concur_update_app("disk",
int(args.concurrency),
int(args.nodes)))
if args.tests == "update_instances":
print_results(concur_update_app("instances",
int(args.concurrency),
int(args.nodes)))
if args.tests == "delete":
print_results(concur_delete_apps(int(args.concurrency),
int(args.nodes)))