255 lines
9.0 KiB
Python
Executable File
255 lines
9.0 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
|
|
from argparse import ArgumentParser
|
|
from datetime import datetime
|
|
from hashlib import md5
|
|
from marathon import MarathonClient
|
|
from marathon.models.container import MarathonContainer
|
|
from marathon.models.container import MarathonContainerPortMapping
|
|
from marathon.models.container import MarathonDockerContainer
|
|
from marathon.models import MarathonApp
|
|
from marathon.models import MarathonHealthCheck
|
|
from Queue import Empty
|
|
from Queue import Queue
|
|
from random import random
|
|
from threading import Thread
|
|
from time import sleep
|
|
from urllib2 import urlopen
|
|
|
|
MEM = 50
|
|
CPUS = 1
|
|
DISK = 50
|
|
|
|
|
|
class HealthCheckBencher(object):
|
|
def __init__(self, marathon_url, image, tasks):
|
|
self.concurrency = 20
|
|
self.docker_image = image
|
|
self.app_base_name = 'health-check-test-'
|
|
self.total_tasks_cout = int(tasks)
|
|
self.instances_per_app = 50
|
|
if tasks < self.instances_per_app:
|
|
self.instances_per_app = self.total_tasks_cout
|
|
self.app_count = 1
|
|
else:
|
|
self.app_count = self.total_tasks_cout/self.instances_per_app
|
|
self.heath_check_interval = 30
|
|
self.test_duration = 20
|
|
self.marathon_cluster = MarathonClient(marathon_url, timeout=240)
|
|
self.work_queue = Queue()
|
|
self.result_queue = Queue()
|
|
self.app_list_queue = Queue()
|
|
self.action_list = [self.start_collect,
|
|
'sleep={}'.format(self.test_duration),
|
|
self.get_stats]
|
|
|
|
def remove_apps(self):
|
|
apps = self.marathon_cluster.list_apps()
|
|
for app in apps:
|
|
if app.id.startswith("/"+self.app_base_name):
|
|
self.marathon_cluster.delete_app(app.id)
|
|
active = 0
|
|
while True:
|
|
apps = self.marathon_cluster.list_apps()
|
|
for app in apps:
|
|
if app.id.startswith(self.app_base_name):
|
|
active += 1
|
|
if active == 0:
|
|
break
|
|
|
|
def create_app(self, id):
|
|
port_mapping = MarathonContainerPortMapping(container_port=80,
|
|
protocol="tcp")
|
|
app_docker = MarathonDockerContainer(
|
|
image=self.docker_image,
|
|
network="BRIDGE",
|
|
force_pull_image=True,
|
|
port_mappings=[port_mapping])
|
|
app_container = MarathonContainer(docker=app_docker)
|
|
http_health_check = MarathonHealthCheck(
|
|
protocol="HTTP",
|
|
path="/status",
|
|
grace_period_seconds=300,
|
|
interval_seconds=self.heath_check_interval,
|
|
timeout_seconds=20,
|
|
max_consecutive_failures=0
|
|
)
|
|
|
|
app_suffix = str(md5(str(random())).hexdigest())
|
|
app_name = self.app_base_name + app_suffix
|
|
new_app = MarathonApp(cpus=CPUS, mem=MEM, disk=DISK,
|
|
container=app_container,
|
|
health_checks=[http_health_check],
|
|
instances=self.instances_per_app,
|
|
max_launch_delay_seconds=5)
|
|
print("Creating {}".format(app_name))
|
|
self.marathon_cluster.create_app(app_id=app_name, app=new_app)
|
|
self.app_list_queue.put(app_name)
|
|
return None
|
|
|
|
def wait_instances(self, app_name):
|
|
health_ok = 0
|
|
while health_ok < self.instances_per_app:
|
|
health_ok = 0
|
|
tasks = self.marathon_cluster.list_tasks(app_name)
|
|
for task in tasks:
|
|
if task.health_check_results:
|
|
health_ok += 1
|
|
|
|
def start_collect(self, task):
|
|
url = 'http://'+task['host']+':'+str(task['port'])+'/start_collect'
|
|
res = urlopen(url)
|
|
if res.getcode() == 200:
|
|
print(task['id']+': collecter was started')
|
|
else:
|
|
print(task['id']+': failed to start collecter')
|
|
|
|
def stop_collect(self, task):
|
|
url = 'http://'+task['host']+':'+str(task['port'])+'/stop_collect'
|
|
res = urlopen(url)
|
|
if res.getcode() == 200:
|
|
print(task['id']+': collecter was stopped')
|
|
else:
|
|
print(task['id']+': failed to stop collecter')
|
|
|
|
def clear_stats(self, task):
|
|
url = 'http://'+task['host']+':'+str(task['port'])+'/clear_stats'
|
|
res = urlopen(url)
|
|
if res.getcode() == 200:
|
|
print(task['id']+': stats was dropped')
|
|
else:
|
|
print(task['id']+': stats was dropped')
|
|
|
|
def get_stats(self, task):
|
|
url = 'http://'+task['host']+':'+str(task['port'])+'/get_timestamps'
|
|
try:
|
|
res = urlopen(url)
|
|
except Exception:
|
|
print("URL req failed")
|
|
self.result_queue.put({'id': task['id'],
|
|
'status': 'Failed',
|
|
'data': []})
|
|
return
|
|
if res.getcode() == 200:
|
|
data = res.read()
|
|
timestamps = data.split(',')
|
|
self.result_queue.put({'id': task['id'],
|
|
'status': 'ok',
|
|
'data': timestamps})
|
|
elif res.getcode() == 202:
|
|
print("Collecting is not enabled")
|
|
self.result_queue.put({'id': task['id'],
|
|
'status': 'Collecting is not enabled',
|
|
'data': []})
|
|
else:
|
|
print("Unknown response code")
|
|
self.result_queue.put({'id': task['id'],
|
|
'status': 'Unknown response code',
|
|
'data': []})
|
|
|
|
def repeat(self, action):
|
|
while self.work_queue.empty() is False:
|
|
try:
|
|
iteration = self.work_queue.get_nowait()
|
|
except Empty:
|
|
continue
|
|
action(iteration)
|
|
self.work_queue.task_done()
|
|
|
|
def fill_queue(self, iterations):
|
|
for iteration in iterations:
|
|
self.work_queue.put(iteration)
|
|
|
|
def get_tasks(self):
|
|
res = []
|
|
tasks = self.marathon_cluster.list_tasks()
|
|
for task in tasks:
|
|
if not task.id.startswith('health-check-test-'):
|
|
continue
|
|
res.append({'id': str(task.id),
|
|
'host': str(task.host),
|
|
'port': str(task.ports[0])})
|
|
return res
|
|
|
|
def create_apps(self):
|
|
self.fill_queue(range(self.app_count))
|
|
for thread_num in range(self.concurrency):
|
|
if self.work_queue.empty() is True:
|
|
break
|
|
worker = Thread(target=self.repeat, args=(self.create_app,))
|
|
worker.start()
|
|
self.work_queue.join()
|
|
|
|
while self.app_list_queue.empty() is False:
|
|
try:
|
|
app_name = self.app_list_queue.get_nowait()
|
|
except Empty:
|
|
continue
|
|
self.work_queue.put(app_name)
|
|
|
|
for thread_num in range(self.concurrency):
|
|
if self.work_queue.empty() is True:
|
|
break
|
|
worker = Thread(target=self.repeat, args=(self.wait_instances,))
|
|
worker.start()
|
|
self.work_queue.join()
|
|
|
|
def start_test(self):
|
|
task_list = self.get_tasks()
|
|
for action in self.action_list:
|
|
if isinstance(action, basestring):
|
|
if action.startswith('sleep='):
|
|
amount = int(action.split('=')[1])
|
|
sleep(60*amount)
|
|
continue
|
|
self.fill_queue(task_list)
|
|
for thread_num in range(self.concurrency):
|
|
if self.work_queue.empty() is True:
|
|
break
|
|
worker = Thread(target=self.repeat, args=(action,))
|
|
worker.start()
|
|
self.work_queue.join()
|
|
|
|
def generate_report(self):
|
|
today = datetime.today()
|
|
file_prefix = "{:%Y-%m-%d_%H_%M_%S-}".format(today)
|
|
file_name = (file_prefix +
|
|
'health_check_result-' +
|
|
str(self.total_tasks_cout) +
|
|
'tasks.csv')
|
|
|
|
f = open(file_name, "w")
|
|
f.write("Task ID,Health check timestamp")
|
|
|
|
while self.result_queue.empty() is False:
|
|
try:
|
|
result = self.result_queue.get_nowait()
|
|
except Empty:
|
|
continue
|
|
for timestamp in result['data']:
|
|
f.write("\n%s,%s" % (result['id'], timestamp))
|
|
|
|
f.close()
|
|
|
|
if __name__ == '__main__':
|
|
parser = ArgumentParser()
|
|
parser.add_argument("-m", "--marathon",
|
|
help="Marathon URL, on example "
|
|
"http://172.20.8.34:8080/virt-env-2/marathon",
|
|
required=True)
|
|
parser.add_argument("-t", "--tasks",
|
|
help="Total tasks count",
|
|
required=True)
|
|
parser.add_argument("-i", "--image",
|
|
help="Docker image path",
|
|
required=True)
|
|
args = parser.parse_args()
|
|
|
|
bencher = HealthCheckBencher(args.marathon, args.image, int(args.tasks))
|
|
|
|
bencher.create_apps()
|
|
bencher.start_test()
|
|
bencher.remove_apps()
|
|
bencher.generate_report()
|