Add support to do VM placements based on roles

1. Add support to do VM placements based on roles;
2. Remove all testing binaries from repo;
3. Fix the issue when creating static files in dib;

Change-Id: Ic7b7e2a2710c6ac135eb54dd82cc807c56804aa2
This commit is contained in:
Yichen Wang 2015-06-04 14:56:34 -07:00
parent 8b701a26ec
commit 21e6b0ef92
17 changed files with 622 additions and 490 deletions

2
.gitignore vendored
View File

@ -61,4 +61,4 @@ ChangeLog
# KloudBuster
*.html
*.qcow2
scale/dib/kloudbuster.d/

View File

@ -109,7 +109,6 @@ class BaseCompute(object):
except Exception:
return None
def find_flavor(self, flavor_type):
"""
Given a named flavor return the flavor
@ -208,3 +207,22 @@ class KeyPair(object):
Remove the keypair created by KloudBuster
"""
self.novaclient.keypairs.delete(self.keypair)
class Flavor(object):
def __init__(self, novaclient):
self.novaclient = novaclient
def create_flavor(self, name, ram, vcpus, disk, override=False):
# Creating flavors
if override:
self.delete_flavor(name)
return self.novaclient.flavors.create(name=name, ram=ram, vcpus=vcpus, disk=disk)
def delete_flavor(self, name):
try:
flavor = self.novaclient.flavors.find(name=name)
flavor.delete()
except Exception:
pass

View File

@ -123,7 +123,6 @@ class BaseNetwork(object):
# Create the VMs on specified network, first keypair, first secgroup
perf_instance.boot_info['image_name'] = config_scale['image_name']
perf_instance.boot_info['flavor_type'] = config_scale['flavor_type']
perf_instance.boot_info['keyname'] = self.router.user.key_name
perf_instance.boot_info['nic'] = [{'net-id': self.network['id']}]
perf_instance.boot_info['sec_group'] = self.secgroup_list[0].secgroup

View File

@ -5,9 +5,6 @@
# packages
image_name: 'Scale Image v8'
# Flavor to use for the test images - the flavor name must exist in OpenStack
flavor_type: 'm1.small'
# Config options common to client and server side
keystone_admin_role: "admin"
@ -15,7 +12,7 @@ keystone_admin_role: "admin"
cleanup_resources: True
# VM creation concurrency
vm_creation_concurrency: 5
vm_creation_concurrency: 10
#
# ssh access to the test VMs launched by kloudbuster is not required
@ -31,6 +28,15 @@ public_key_file:
# SERVER SIDE CONFIG OPTIONS
server:
# Flavor to use for the test images
flavor:
# Number of vCPUs for the flavor
vcpus: 1
# Memory for the flavor in MB
ram: 2048
# Size of local disk in GB
disk: 20
# Number of tenants to be created on the cloud
number_tenants: 1
@ -46,13 +52,13 @@ server:
networks_per_router: 1
# Number of VM instances to be created within the context of each Network
vms_per_network: 1
vms_per_network: 2
# Number of security groups per network
secgroups_per_network: 1
# Assign floating IP for every VM
use_floatingip: True
use_floatingip: False
# Placement hint
# Availability zone to use for servers in the server cloud
@ -66,7 +72,16 @@ server:
# CLIENT SIDE CONFIG OPTIONS
client:
# Assign floating IP for every VM
use_floatingip: True
use_floatingip: False
# Flavor to use for the test images
flavor:
# Number of vCPUs for the flavor
vcpus: 1
# Memory for the flavor in MB
ram: 2048
# Size of local disk in GB
disk: 20
# Placement hint
# Availability zone to use for clients in the client cloud

7
scale/cfg.topo.yaml Normal file
View File

@ -0,0 +1,7 @@
# Compute host topology file for running KloudBuster
servers_rack:
hh23-5
clients_rack:
hh23-6

0
scale/dib/build-image.sh Normal file → Executable file
View File

View File

@ -37,19 +37,7 @@ update-rc.d -f nginx remove
sed -i "s/127.0.0.1/0.0.0.0/g" /etc/redis/redis.conf
# if started nginx should be allowed to open more file descriptors
sed -i 's/start-stop-daemon\ --start/ulimit\ \-n\ 102400\n\ \ \ \ \0/g' /etc/init.d/nginx
# copy the agent python script
# the file is in the same directory as ./dib/elements, so need to go up one level
(
IFS=:
for p in $ELEMENTS_PATH; do
if [ -f $p/../../kb_vm_agent.py ]; then
cp $p/../../kb_vm_agent.py /var/tmp
break
fi
done
)
sed -i 's/start-stop-daemon\ --start/ulimit\ \-n\ 102400\n\t\0/g' /etc/init.d/nginx
# ======
# Client
@ -67,9 +55,8 @@ cd ..
rm -rf wrk2
# uninstall unneeded packages
apt-get -y remove git
apt-get -y remove python-pip
apt-get -y remove build-essential
apt-get -y --purge remove git
apt-get -y --purge remove python-pip
apt-get -y --purge remove build-essential
apt-get -y autoremove
apt-get -y autoclean

View File

@ -0,0 +1,239 @@
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import socket
import subprocess
import sys
import threading
import time
import redis
class KB_Instance(object):
# Check whether the HTTP Service is up running
@staticmethod
def check_http_service(target_url):
cmd = 'while true; do\n'
cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url)
cmd += 'if [ $? -eq 0 ]; then break; fi\n'
cmd += 'done'
return cmd
# Add static route
@staticmethod
def add_static_route(network, next_hop_ip, if_name=None):
debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip)
cmd = "sudo ip route add %s via %s" % (network, next_hop_ip)
if if_name:
debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Get static route
@staticmethod
def get_static_route(network, next_hop_ip=None, if_name=None):
cmd = "ip route show %s" % network
if next_hop_ip:
cmd += " via %s" % next_hop_ip
if if_name:
cmd += " dev %s" % if_name
return cmd
# Delete static route
@staticmethod
def delete_static_route(network, next_hop_ip=None, if_name=None):
debug_msg = "Deleting static route %s" % network
cmd = "sudo ip route del %s" % network
if next_hop_ip:
debug_msg = " with next hop %s" % next_hop_ip
cmd += " via %s" % next_hop_ip
if if_name:
if next_hop_ip:
debug_msg = " and %s" % if_name
else:
debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Run the HTTP benchmarking tool
@staticmethod
def run_http_test(dest_path, target_url, threads, connections,
rate_limit, duration, timeout, connection_type):
if not rate_limit:
rate_limit = 65535
cmd = '%s -t%d -c%d -R%d -d%ds --timeout %ds --latency --s kb.lua %s' % \
(dest_path, threads, connections, rate_limit, duration, timeout, target_url)
return cmd
class KB_VM_Agent(object):
def __init__(self, user_data):
host = user_data['redis_server']
port = user_data['redis_server_port']
self.user_data = user_data
self.redis_obj = redis.StrictRedis(host=host, port=port)
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.hello_thread = None
self.stop_hello = threading.Event()
# Assumption:
# Here we assume the vm_name is the same as the host name (lower case),
# which is true if the VM is spawned by Kloud Buster.
self.vm_name = socket.gethostname().lower()
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
self.last_cmd = None
def setup_channels(self):
# Check for connections to redis server
while (True):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
time.sleep(1)
continue
break
# Subscribe to orchestration channel
self.pubsub.subscribe(self.orches_chan_name)
def report(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': self.vm_name,
'client-type': client_type, 'data': data}
self.redis_obj.publish(self.report_chan_name, message)
def send_hello(self):
# Sending "hello" message to master node every 2 seconds
while not self.stop_hello.is_set():
self.report('READY', None, None)
time.sleep(2)
def exec_command(self, cmd):
# Execute the command, and returns the outputs
cmds = ['bash', '-c']
cmds.append(cmd)
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return (p.returncode, stdout, stderr)
def process_cmd(self, message):
if message['cmd'] == 'ACK':
# When 'ACK' is received, means the master node
# acknowledged the current VM. So stopped sending more
# "hello" packet to the master node.
# Unfortunately, there is no thread.stop() in Python 2.x
self.stop_hello.set()
elif message['cmd'] == 'EXEC':
self.last_cmd = ""
try:
cmd_res_tuple = eval('self.exec_' + message['data']['cmd'] + '()')
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
except Exception as exc:
cmd_res_dict = {
"status": 1,
"stdout": self.last_cmd,
"stderr": str(exc)
}
self.report('DONE', message['client-type'], cmd_res_dict)
elif message['cmd'] == 'ABORT':
# TODO(Add support to abort a session)
pass
else:
# Unexpected
# TODO(Logging on Agent)
print 'ERROR: Unexpected command received!'
pass
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
# Convert the string representation of dict to real dict obj
message = eval(item['data'])
self.process_cmd(message)
def exec_setup_static_route(self):
self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip'])
result = self.exec_command(self.last_cmd)
if (self.user_data['target_subnet_ip'] not in result[1]):
self.last_cmd = KB_Instance.add_static_route(
self.user_data['target_subnet_ip'],
self.user_data['target_shared_interface_ip'])
return self.exec_command(self.last_cmd)
else:
return (0, '', '')
def exec_check_http_service(self):
self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url'])
return self.exec_command(self.last_cmd)
def exec_run_http_test(self):
self.last_cmd = KB_Instance.run_http_test(
dest_path=self.user_data['http_tool']['dest_path'],
target_url=self.user_data['target_url'],
**self.user_data['http_tool_configs'])
return self.exec_command(self.last_cmd)
def exec_command(cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return p.returncode
def start_redis_server():
cmd = ['sudo', 'service', 'redis-server', 'start']
return exec_command(cmd)
def start_nuttcp_server():
cmd = ['/var/tmp/nuttcp-7.3.2', '-P5002', '-S', '--single-threaded']
return exec_command(cmd)
def start_nginx_server():
cmd = ['sudo', 'service', 'nginx', 'start']
return exec_command(cmd)
if __name__ == "__main__":
try:
f = open('/var/tmp/user-data', 'r')
user_data = eval(f.read())
except Exception as e:
# TODO(Logging on Agent)
print e.message
sys.exit(1)
if 'role' not in user_data:
sys.exit(1)
if user_data['role'] == 'KB-PROXY':
sys.exit(start_redis_server())
if user_data['role'] == 'Server':
rc1 = start_nuttcp_server()
rc2 = start_nginx_server()
sys.exit(rc1 or rc2)
elif user_data['role'] == 'Client':
agent = KB_VM_Agent(user_data)
agent.setup_channels()
agent.hello_thread = threading.Thread(target=agent.send_hello)
agent.hello_thread.daemon = True
agent.hello_thread.start()
agent.work()
else:
sys.exit(1)

View File

@ -1,11 +0,0 @@
-- example reporting script which demonstrates a custom
-- done() function that prints latency percentiles as CSV
done = function(summary, latency, requests)
io.write("__START_KLOUDBUSTER_DATA__\n")
for _, p in pairs({ 50, 75, 90, 99, 99.9, 99.99, 99.999 }) do
n = latency:percentile(p)
io.write(string.format("%g,%d\n", p, n))
end
io.write("__END_KLOUDBUSTER_DATA__\n")
end

1
scale/kb.lua Symbolic link
View File

@ -0,0 +1 @@
dib/elements/kloudbuster/static/kb_test/kb.lua

234
scale/kb_runner.py Normal file
View File

@ -0,0 +1,234 @@
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import log as logging
import redis
LOG = logging.getLogger(__name__)
class KBVMUpException(Exception):
pass
class KBSetStaticRouteException(Exception):
pass
class KBHTTPServerUpException(Exception):
pass
class KBHTTPBenchException(Exception):
pass
class KBProxyConnectionException(Exception):
pass
class KBRunner(object):
"""
Control the testing VMs on the testing cloud
"""
def __init__(self, client_list, config, single_cloud=True):
self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list))
self.config = config
self.single_cloud = single_cloud
self.result = {}
self.host_stats = {}
self.tool_result = {}
# Redis
self.redis_obj = None
self.pubsub = None
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
def setup_redis(self, redis_server, redis_server_port=6379, timeout=120):
LOG.info("Setting up redis connection pool...")
# For now, the redis server is not in the scope of Kloud Buster, which has to be
# pre-configured before executing Kloud Buster.
connection_pool = redis.ConnectionPool(
host=redis_server, port=redis_server_port, db=0)
LOG.info("Setting up the redis connections...")
self.redis_obj = redis.StrictRedis(connection_pool=connection_pool,
socket_connect_timeout=1,
socket_timeout=1)
success = False
retry_count = max(timeout / self.config.polling_interval, 1)
# Check for connections to redis server
for retry in xrange(retry_count):
try:
self.redis_obj.get("test")
success = True
except (redis.exceptions.ConnectionError):
LOG.info("Connecting to redis server... Retry #%d/%d", retry, retry_count)
time.sleep(self.config.polling_interval)
continue
break
if not success:
LOG.error("Error: Cannot connect to the Redis server")
raise KBProxyConnectionException()
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
def dispose(self):
if self.pubsub:
self.pubsub.unsubscribe()
self.pubsub.close()
def send_cmd(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': 'kb-master',
'client-type': client_type, 'data': data}
LOG.kbdebug(message)
self.redis_obj.publish(self.orches_chan_name, message)
def polling_vms(self, timeout, polling_interval=None):
'''
Polling all VMs for the status of execution
Guarantee to run once if the timeout is less than polling_interval
'''
if not polling_interval:
polling_interval = self.config.polling_interval
retry_count = max(timeout / polling_interval, 1)
retry = cnt_succ = cnt_failed = 0
clist = self.client_dict.copy()
while (retry < retry_count and len(clist)):
time.sleep(polling_interval)
while True:
msg = self.pubsub.get_message()
if not msg:
# No new message, commands are in executing
break
LOG.kbdebug(msg)
payload = eval(msg['data'])
vm_name = payload['sender-id']
instance = self.client_dict[vm_name]
cmd = payload['cmd']
if cmd == 'READY':
# If a READY packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all READY
# messages received afterwards.
if instance.up_flag:
continue
else:
clist[vm_name].up_flag = True
clist.pop(vm_name)
cnt_succ = cnt_succ + 1
elif cmd == 'DONE':
self.result[vm_name] = payload['data']
clist.pop(vm_name)
if self.result[vm_name]['status']:
# Command returned with non-zero status, command failed
LOG.error("[%s] %s", vm_name, self.result[vm_name]['stderr'])
cnt_failed = cnt_failed + 1
else:
# Command returned with zero, command succeed
cnt_succ = cnt_succ + 1
elif cmd == 'DEBUG':
LOG.info('[%s] %s' + (vm_name, payload['data']))
else:
LOG.error('[%s] received invalid command: %s' + (vm_name, cmd))
LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" %
(cnt_succ, cnt_failed, len(clist), retry))
retry = retry + 1
return (cnt_succ, cnt_failed, len(clist))
def wait_for_vm_up(self, timeout=300):
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBVMUpException()
self.send_cmd('ACK', None, None)
def setup_static_route(self, timeout=10):
func = {'cmd': 'setup_static_route'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBSetStaticRouteException()
def check_http_service(self, timeout=30):
func = {'cmd': 'check_http_service'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBHTTPServerUpException()
def run_http_test(self):
func = {'cmd': 'run_http_test'}
self.send_cmd('EXEC', 'http', func)
# Give additional 30 seconds for everybody to report results
timeout = self.config.http_tool_configs.duration + 30
cnt_pending = self.polling_vms(timeout)[2]
if cnt_pending != 0:
LOG.warn("Testing VMs are not returning results within grace period, "
"summary shown below may not be accurate!")
# Parse the results from HTTP Tools
for key, instance in self.client_dict.items():
self.result[key] = instance.http_client_parser(**self.result[key])
def gen_host_stats(self):
for vm in self.result.keys():
phy_host = self.client_dict[vm].host
if phy_host not in self.host_stats:
self.host_stats[phy_host] = []
self.host_stats[phy_host].append(self.result[vm])
http_tool = self.client_dict.values()[0].http_tool
for phy_host in self.host_stats:
self.host_stats[phy_host] = http_tool.consolidate_results(self.host_stats[phy_host])
def run(self):
try:
LOG.info("Waiting for agents on VMs to come up...")
self.wait_for_vm_up()
if self.single_cloud:
LOG.info("Setting up static route to reach tested cloud...")
self.setup_static_route()
LOG.info("Waiting for HTTP service to come up...")
self.check_http_service()
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Starting HTTP Benchmarking...")
self.run_http_test()
# Call the method in corresponding tools to consolidate results
http_tool = self.client_dict.values()[0].http_tool
LOG.kbdebug(self.result.values())
self.tool_result = http_tool.consolidate_results(self.result.values())
self.tool_result['http_rate_limit'] = self.config.http_tool_configs.rate_limit
self.tool_result['total_connections'] =\
len(self.client_dict) * self.config.http_tool_configs.connections
self.gen_host_stats()
except (KBSetStaticRouteException):
LOG.error("Could not set static route.")
return
except (KBHTTPServerUpException):
LOG.error("HTTP service is not up in testing cloud.")
return
except KBHTTPBenchException():
LOG.error("Error in HTTP benchmarking.")
return

View File

@ -12,210 +12,56 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import log as logging
import redis
LOG = logging.getLogger(__name__)
class KBVMUpException(Exception):
class KBVMMappingAlgoNotSup(Exception):
pass
class KBSetStaticRouteException(Exception):
pass
class KBHTTPServerUpException(Exception):
pass
class KBHTTPBenchException(Exception):
pass
class KBProxyConnectionException(Exception):
class KBVMPlacementAlgoNotSup(Exception):
pass
class KBScheduler(object):
"""
Control the testing VMs on the testing cloud
1. VM Placements
2. Mapping client VMs to target servers
"""
def __init__(self, client_list, config, single_cloud=True):
self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list))
self.config = config
self.single_cloud = single_cloud
self.result = {}
self.tool_result = {}
# Redis
self.redis_obj = None
self.pubsub = None
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
def setup_redis(self, redis_server, redis_server_port=6379, timeout=120):
LOG.info("Setting up redis connection pool...")
# For now, the redis server is not in the scope of Kloud Buster, which has to be
# pre-configured before executing Kloud Buster.
connection_pool = redis.ConnectionPool(
host=redis_server, port=redis_server_port, db=0)
LOG.info("Setting up the redis connections...")
self.redis_obj = redis.StrictRedis(connection_pool=connection_pool,
socket_connect_timeout=1)
success = False
retry_count = max(timeout / self.config.polling_interval, 1)
# Check for connections to redis server
for retry in xrange(retry_count):
try:
self.redis_obj.get("test")
success = True
except (redis.exceptions.ConnectionError):
LOG.info("Connecting to redis server... Retry #%d/%d", retry, retry_count)
time.sleep(self.config.polling_interval)
continue
break
if not success:
LOG.error("Error: Cannot connect to the Redis server")
raise KBProxyConnectionException()
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
def dispose(self):
if self.pubsub:
self.pubsub.unsubscribe()
self.pubsub.close()
def send_cmd(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': 'kb-master',
'client-type': client_type, 'data': data}
LOG.kbdebug(message)
self.redis_obj.publish(self.orches_chan_name, message)
def polling_vms(self, timeout, polling_interval=None):
'''
Polling all VMs for the status of execution
Guarantee to run once if the timeout is less than polling_interval
'''
if not polling_interval:
polling_interval = self.config.polling_interval
retry_count = max(timeout / polling_interval, 1)
retry = cnt_succ = cnt_failed = 0
clist = self.client_dict.copy()
while (retry < retry_count and len(clist)):
time.sleep(polling_interval)
while True:
msg = self.pubsub.get_message()
if not msg:
# No new message, commands are in executing
break
LOG.kbdebug(msg)
payload = eval(msg['data'])
vm_name = payload['sender-id']
instance = self.client_dict[vm_name]
cmd = payload['cmd']
if cmd == 'READY':
# If a READY packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all READY
# messages received afterwards.
if instance.up_flag:
continue
else:
clist[vm_name].up_flag = True
clist.pop(vm_name)
cnt_succ = cnt_succ + 1
elif cmd == 'DONE':
self.result[vm_name] = payload['data']
clist.pop(vm_name)
if self.result[vm_name]['status']:
# Command returned with non-zero status, command failed
LOG.error("[%s] %s", vm_name, self.result[vm_name]['stderr'])
cnt_failed = cnt_failed + 1
else:
# Command returned with zero, command succeed
cnt_succ = cnt_succ + 1
elif cmd == 'DEBUG':
LOG.info('[%s] %s' + (vm_name, payload['data']))
else:
LOG.error('[%s] received invalid command: %s' + (vm_name, cmd))
LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" %
(cnt_succ, cnt_failed, len(clist), retry))
retry = retry + 1
return (cnt_succ, cnt_failed, len(clist))
def wait_for_vm_up(self, timeout=300):
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBVMUpException()
self.send_cmd('ACK', None, None)
def setup_static_route(self, timeout=10):
func = {'cmd': 'setup_static_route'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBSetStaticRouteException()
def check_http_service(self, timeout=30):
func = {'cmd': 'check_http_service'}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBHTTPServerUpException()
def run_http_test(self):
func = {'cmd': 'run_http_test'}
LOG.info(func)
self.send_cmd('EXEC', 'http', func)
# Give additional 30 seconds for everybody to report results
timeout = self.config.http_tool_configs.duration + 30
cnt_pending = self.polling_vms(timeout)[2]
if cnt_pending != 0:
LOG.warn("Testing VMs are not returning results within grace period, "
"summary shown below may not be accurate!")
# Parse the results from HTTP Tools
for key, instance in self.client_dict.items():
self.result[key] = instance.http_client_parser(**self.result[key])
def run(self):
try:
LOG.info("Waiting for agents on VMs to come up...")
self.wait_for_vm_up()
if self.single_cloud:
LOG.info("Setting up static route to reach tested cloud...")
self.setup_static_route()
LOG.info("Waiting for HTTP service to come up...")
self.check_http_service()
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Starting HTTP Benchmarking...")
self.run_http_test()
# Call the method in corresponding tools to consolidate results
http_tool = self.client_dict.values()[0].http_tool
LOG.kbdebug(self.result.values())
self.tool_result = http_tool.consolidate_results(self.result.values())
self.tool_result['http_rate_limit'] = self.config.http_tool_configs.rate_limit
self.tool_result['total_connections'] =\
len(self.client_dict) * self.config.http_tool_configs.connections
except (KBSetStaticRouteException):
LOG.error("Could not set static route.")
return
except (KBHTTPServerUpException):
LOG.error("HTTP service is not up in testing cloud.")
return
except KBHTTPBenchException():
LOG.error("Error in HTTP benchmarking.")
@staticmethod
def setup_vm_placement(role, vm_list, topology, avail_zone, algorithm):
if not topology:
# Will use nova-scheduler to pick up the hypervisors
return
if not avail_zone:
# Default availability zone in NOVA
avail_zone = "nova"
if role == "Server":
host_list = topology.servers_rack.split()
else:
host_list = topology.clients_rack.split()
host_count = len(host_list)
if algorithm == "Round-robin":
host_idx = 0
for ins in vm_list:
ins.boot_info['avail_zone'] = "%s:%s" % (avail_zone, host_list[host_idx])
host_idx = (host_idx + 1) % host_count
else:
LOG.error("Unsupported algorithm!")
raise KBVMPlacementAlgoNotSup()
@staticmethod
def setup_vm_mappings(client_list, server_list, algorithm):
# VM Mapping framework/algorithm to mapping clients to servers.
# e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc.
# Here we only support N*1:1, i.e. 1 client VM maps to 1 server VM, total of N pairs.
if algorithm == "1:1":
for idx, ins in enumerate(client_list):
ins.target_url = "http://%s/index.html" %\
(server_list[idx].fip_ip or server_list[idx].fixed_ip)
ins.user_data['target_url'] = ins.target_url
else:
LOG.error("Unsupported algorithm!")
raise KBVMMappingAlgoNotSup()

View File

@ -1,239 +0,0 @@
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import socket
import subprocess
import sys
import threading
import time
import redis
class KB_Instance(object):
# Check whether the HTTP Service is up running
@staticmethod
def check_http_service(target_url):
cmd = 'while true; do\n'
cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url)
cmd += 'if [ $? -eq 0 ]; then break; fi\n'
cmd += 'done'
return cmd
# Add static route
@staticmethod
def add_static_route(network, next_hop_ip, if_name=None):
debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip)
cmd = "sudo ip route add %s via %s" % (network, next_hop_ip)
if if_name:
debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Get static route
@staticmethod
def get_static_route(network, next_hop_ip=None, if_name=None):
cmd = "ip route show %s" % network
if next_hop_ip:
cmd += " via %s" % next_hop_ip
if if_name:
cmd += " dev %s" % if_name
return cmd
# Delete static route
@staticmethod
def delete_static_route(network, next_hop_ip=None, if_name=None):
debug_msg = "Deleting static route %s" % network
cmd = "sudo ip route del %s" % network
if next_hop_ip:
debug_msg = " with next hop %s" % next_hop_ip
cmd += " via %s" % next_hop_ip
if if_name:
if next_hop_ip:
debug_msg = " and %s" % if_name
else:
debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name
# TODO(Logging on Agent)
print debug_msg
return cmd
# Run the HTTP benchmarking tool
@staticmethod
def run_http_test(dest_path, target_url, threads, connections,
rate_limit, duration, timeout, connection_type):
if not rate_limit:
rate_limit = 65535
cmd = '%s -t%d -c%d -R%d -d%ds --timeout %ds --latency --s kb.lua %s' % \
(dest_path, threads, connections, rate_limit, duration, timeout, target_url)
return cmd
class KB_VM_Agent(object):
def __init__(self, user_data):
host = user_data['redis_server']
port = user_data['redis_server_port']
self.user_data = user_data
self.redis_obj = redis.StrictRedis(host=host, port=port)
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.hello_thread = None
self.stop_hello = threading.Event()
# Assumption:
# Here we assume the vm_name is the same as the host name (lower case),
# which is true if the VM is spawned by Kloud Buster.
self.vm_name = socket.gethostname().lower()
self.orches_chan_name = "kloudbuster_orches"
self.report_chan_name = "kloudbuster_report"
self.last_cmd = None
def setup_channels(self):
# Check for connections to redis server
while (True):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
time.sleep(1)
continue
break
# Subscribe to orchestration channel
self.pubsub.subscribe(self.orches_chan_name)
def report(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': self.vm_name,
'client-type': client_type, 'data': data}
self.redis_obj.publish(self.report_chan_name, message)
def send_hello(self):
# Sending "hello" message to master node every 2 seconds
while not self.stop_hello.is_set():
self.report('READY', None, None)
time.sleep(2)
def exec_command(self, cmd):
# Execute the command, and returns the outputs
cmds = ['bash', '-c']
cmds.append(cmd)
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return (p.returncode, stdout, stderr)
def process_cmd(self, message):
if message['cmd'] == 'ACK':
# When 'ACK' is received, means the master node
# acknowledged the current VM. So stopped sending more
# "hello" packet to the master node.
# Unfortunately, there is no thread.stop() in Python 2.x
self.stop_hello.set()
elif message['cmd'] == 'EXEC':
self.last_cmd = ""
try:
cmd_res_tuple = eval('self.exec_' + message['data']['cmd'] + '()')
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
except Exception as exc:
cmd_res_dict = {
"status": 1,
"stdout": self.last_cmd,
"stderr": str(exc)
}
self.report('DONE', message['client-type'], cmd_res_dict)
elif message['cmd'] == 'ABORT':
# TODO(Add support to abort a session)
pass
else:
# Unexpected
# TODO(Logging on Agent)
print 'ERROR: Unexpected command received!'
pass
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
# Convert the string representation of dict to real dict obj
message = eval(item['data'])
self.process_cmd(message)
def exec_setup_static_route(self):
self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip'])
result = self.exec_command(self.last_cmd)
if (self.user_data['target_subnet_ip'] not in result[1]):
self.last_cmd = KB_Instance.add_static_route(
self.user_data['target_subnet_ip'],
self.user_data['target_shared_interface_ip'])
return self.exec_command(self.last_cmd)
else:
return (0, '', '')
def exec_check_http_service(self):
self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url'])
return self.exec_command(self.last_cmd)
def exec_run_http_test(self):
self.last_cmd = KB_Instance.run_http_test(
dest_path=self.user_data['http_tool']['dest_path'],
target_url=self.user_data['target_url'],
**self.user_data['http_tool_configs'])
return self.exec_command(self.last_cmd)
def exec_command(cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return p.returncode
def start_redis_server():
cmd = ['sudo', 'service', 'redis-server', 'start']
return exec_command(cmd)
def start_nuttcp_server():
cmd = ['/var/tmp/nuttcp-7.3.2', '-P5002', '-S', '--single-threaded']
return exec_command(cmd)
def start_nginx_server():
cmd = ['sudo', 'service', 'nginx', 'start']
return exec_command(cmd)
if __name__ == "__main__":
try:
f = open('/var/tmp/user-data', 'r')
user_data = eval(f.read())
except Exception as e:
# TODO(Logging on Agent)
print e.message
sys.exit(1)
if 'role' not in user_data:
sys.exit(1)
if user_data['role'] == 'KB-PROXY':
sys.exit(start_redis_server())
if user_data['role'] == 'Server':
rc1 = start_nuttcp_server()
rc2 = start_nginx_server()
sys.exit(rc1 or rc2)
elif user_data['role'] == 'Client':
agent = KB_VM_Agent(user_data)
agent.setup_channels()
agent.hello_thread = threading.Thread(target=agent.send_hello)
agent.hello_thread.daemon = True
agent.hello_thread.start()
agent.work()
else:
sys.exit(1)

1
scale/kb_vm_agent.py Symbolic link
View File

@ -0,0 +1 @@
dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py

View File

@ -19,8 +19,10 @@ import sys
import threading
import traceback
import base_compute
import base_network
import configure
from kb_runner import KBRunner
from kb_scheduler import KBScheduler
from keystoneclient.v2_0 import client as keystoneclient
import log as logging
@ -89,7 +91,26 @@ class Kloud(object):
self.tenant_list.append(new_tenant)
new_tenant.create_resources()
# Create flavors for servers, clients, and kb-proxy nodes
nova_client = self.tenant_list[0].user_list[0].nova_client
flavor_manager = base_compute.Flavor(nova_client)
flavor_dict = self.scale_cfg.flavor
if self.testing_side:
flavor_manager.create_flavor('kb.client', override=True, **flavor_dict)
flavor_manager.create_flavor('kb.proxy', override=True, ram=2048, vcpus=1, disk=20)
else:
flavor_manager.create_flavor('kb.server', override=True, **flavor_dict)
def delete_resources(self):
# Deleting flavors created by KloudBuster
nova_client = self.tenant_list[0].user_list[0].nova_client
flavor_manager = base_compute.Flavor(nova_client)
if self.testing_side:
flavor_manager.delete_flavor('kb.client')
flavor_manager.delete_flavor('kb.proxy')
else:
flavor_manager.delete_flavor('kb.server')
for tnt in self.tenant_list:
tnt.delete_resources()
@ -162,7 +183,7 @@ class KloudBuster(object):
4. Networks per router
5. Instances per network
"""
def __init__(self, server_cred, client_cred, server_cfg, client_cfg):
def __init__(self, server_cred, client_cred, server_cfg, client_cfg, topology):
# List of tenant objects to keep track of all tenants
self.tenant_list = []
self.tenant = None
@ -170,6 +191,7 @@ class KloudBuster(object):
self.tenant_testing = None
self.server_cfg = server_cfg
self.client_cfg = client_cfg
self.topology = topology
# TODO(check on same auth_url instead)
if server_cred == client_cred:
self.single_cloud = True
@ -209,27 +231,27 @@ class KloudBuster(object):
LOG.info("Preparing metadata for VMs... (%s)" % role)
if role == "Server":
svr_list = self.kloud.get_all_instances()
KBScheduler.setup_vm_placement(role, svr_list, self.topology,
self.kloud.placement_az, "Round-robin")
for ins in svr_list:
ins.user_data['role'] = "Server"
ins.boot_info['flavor_type'] = "kb.server"
ins.boot_info['user_data'] = str(ins.user_data)
elif role == "Client":
# We supposed to have a mapping framework/algorithm to mapping clients to servers.
# e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc.
# Here we are using N*1:1
client_list = self.testing_kloud.get_all_instances()
svr_list = self.kloud.get_all_instances()
KBScheduler.setup_vm_mappings(client_list, svr_list, "1:1")
KBScheduler.setup_vm_placement(role, client_list, self.topology,
self.testing_kloud.placement_az, "Round-robin")
for idx, ins in enumerate(client_list):
ins.target_url = "http://%s/index.html" %\
(svr_list[idx].fip_ip or svr_list[idx].fixed_ip)
ins.user_data['role'] = "Client"
ins.user_data['redis_server'] = self.kb_proxy.fixed_ip
ins.user_data['redis_server_port'] = 6379
ins.user_data['target_subnet_ip'] = svr_list[idx].subnet_ip
ins.user_data['target_shared_interface_ip'] = svr_list[idx].shared_interface_ip
ins.user_data['target_url'] = ins.target_url
ins.user_data['http_tool'] = ins.config['http_tool']
ins.user_data['http_tool_configs'] = ins.config['http_tool_configs']
ins.boot_info['flavor_type'] = "kb.client"
ins.boot_info['user_data'] = str(ins.user_data)
def run(self):
@ -238,13 +260,13 @@ class KloudBuster(object):
Executes tests serially
Support concurrency in fututure
"""
kbscheduler = None
kbrunner = None
vm_creation_concurrency = self.client_cfg.vm_creation_concurrency
try:
self.kloud.create_resources()
self.testing_kloud.create_resources()
# Start the scheduler and ready for the incoming redis messages
# Start the runner and ready for the incoming redis messages
client_list = self.testing_kloud.get_all_instances()
server_list = self.kloud.get_all_instances()
@ -254,12 +276,15 @@ class KloudBuster(object):
self.kb_proxy.vm_name = "KB-PROXY"
self.kb_proxy.user_data['role'] = 'KB-PROXY'
self.kb_proxy.boot_info['flavor_type'] = 'm1.small'
self.kb_proxy.boot_info['flavor_type'] = 'kb.proxy'
if self.testing_kloud.placement_az:
self.kb_proxy.boot_info['avail_zone'] = "%s:%s" %\
(self.testing_kloud.placement_az, self.topology.clients_rack.split()[0])
self.kb_proxy.boot_info['user_data'] = str(self.kb_proxy.user_data)
self.testing_kloud.create_vm(self.kb_proxy)
kbscheduler = KBScheduler(client_list, self.client_cfg, self.single_cloud)
kbscheduler.setup_redis(self.kb_proxy.fip_ip)
kbrunner = KBRunner(client_list, self.client_cfg, self.single_cloud)
kbrunner.setup_redis(self.kb_proxy.fip_ip)
if self.single_cloud:
# Find the shared network if the cloud used to testing is same
@ -292,11 +317,12 @@ class KloudBuster(object):
# Function that print all the provisioning info
self.print_provision_info()
# Run the scheduler to perform benchmarkings
kbscheduler.run()
self.final_result = kbscheduler.tool_result
# Run the runner to perform benchmarkings
kbrunner.run()
self.final_result = kbrunner.tool_result
self.final_result['total_server_vms'] = len(server_list)
self.final_result['total_client_vms'] = len(client_list)
# self.final_result['host_stats'] = kbrunner.host_stats
LOG.info(self.final_result)
except KeyboardInterrupt:
traceback.format_exc()
@ -315,8 +341,8 @@ class KloudBuster(object):
self.testing_kloud.delete_resources()
except Exception:
traceback.print_exc()
if kbscheduler:
kbscheduler.dispose()
if kbrunner:
kbrunner.dispose()
def get_total_vm_count(config):
return (config['number_tenants'] * config['users_per_tenant'] *
@ -355,6 +381,10 @@ if __name__ == '__main__':
short="c",
default=None,
help="Override default values with a config file"),
cfg.StrOpt("topology",
short="t",
default=None,
help="Topology files for compute hosts"),
cfg.StrOpt("tested-rc",
default=None,
help="Tested cloud openrc credentials file"),
@ -386,6 +416,11 @@ if __name__ == '__main__':
alt_config = configure.Configuration.from_file(CONF.config).configure()
config_scale = config_scale.merge(alt_config)
if CONF.topology:
topology = configure.Configuration.from_file(CONF.topology).configure()
else:
topology = None
# Retrieve the credentials
cred = credentials.Credentials(CONF.tested_rc, CONF.passwd_tested, CONF.no_env)
if CONF.testing_rc and CONF.testing_rc != CONF.tested_rc:
@ -427,7 +462,7 @@ if __name__ == '__main__':
# The KloudBuster class is just a wrapper class
# levarages tenant and user class for resource creations and
# deletion
kloudbuster = KloudBuster(cred, cred_testing, server_side_cfg, client_side_cfg)
kloudbuster = KloudBuster(cred, cred_testing, server_side_cfg, client_side_cfg, topology)
kloudbuster.run()
if CONF.json:

Binary file not shown.

Binary file not shown.

Binary file not shown.