diff --git a/.gitignore b/.gitignore index 857261e..69f33b8 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,4 @@ ChangeLog # KloudBuster *.html *.qcow2 - +scale/dib/kloudbuster.d/ diff --git a/scale/base_compute.py b/scale/base_compute.py index 75e0598..1f05a76 100644 --- a/scale/base_compute.py +++ b/scale/base_compute.py @@ -109,7 +109,6 @@ class BaseCompute(object): except Exception: return None - def find_flavor(self, flavor_type): """ Given a named flavor return the flavor @@ -208,3 +207,22 @@ class KeyPair(object): Remove the keypair created by KloudBuster """ self.novaclient.keypairs.delete(self.keypair) + + +class Flavor(object): + + def __init__(self, novaclient): + self.novaclient = novaclient + + def create_flavor(self, name, ram, vcpus, disk, override=False): + # Creating flavors + if override: + self.delete_flavor(name) + return self.novaclient.flavors.create(name=name, ram=ram, vcpus=vcpus, disk=disk) + + def delete_flavor(self, name): + try: + flavor = self.novaclient.flavors.find(name=name) + flavor.delete() + except Exception: + pass diff --git a/scale/base_network.py b/scale/base_network.py index 9aac382..70bc0a6 100644 --- a/scale/base_network.py +++ b/scale/base_network.py @@ -123,7 +123,6 @@ class BaseNetwork(object): # Create the VMs on specified network, first keypair, first secgroup perf_instance.boot_info['image_name'] = config_scale['image_name'] - perf_instance.boot_info['flavor_type'] = config_scale['flavor_type'] perf_instance.boot_info['keyname'] = self.router.user.key_name perf_instance.boot_info['nic'] = [{'net-id': self.network['id']}] perf_instance.boot_info['sec_group'] = self.secgroup_list[0].secgroup diff --git a/scale/cfg.scale.yaml b/scale/cfg.scale.yaml index 8b18742..ecbb774 100644 --- a/scale/cfg.scale.yaml +++ b/scale/cfg.scale.yaml @@ -5,9 +5,6 @@ # packages image_name: 'Scale Image v8' -# Flavor to use for the test images - the flavor name must exist in OpenStack -flavor_type: 'm1.small' - # Config options common to client and server side keystone_admin_role: "admin" @@ -15,7 +12,7 @@ keystone_admin_role: "admin" cleanup_resources: True # VM creation concurrency -vm_creation_concurrency: 5 +vm_creation_concurrency: 10 # # ssh access to the test VMs launched by kloudbuster is not required @@ -31,6 +28,15 @@ public_key_file: # SERVER SIDE CONFIG OPTIONS server: + # Flavor to use for the test images + flavor: + # Number of vCPUs for the flavor + vcpus: 1 + # Memory for the flavor in MB + ram: 2048 + # Size of local disk in GB + disk: 20 + # Number of tenants to be created on the cloud number_tenants: 1 @@ -46,13 +52,13 @@ server: networks_per_router: 1 # Number of VM instances to be created within the context of each Network - vms_per_network: 1 + vms_per_network: 2 # Number of security groups per network secgroups_per_network: 1 # Assign floating IP for every VM - use_floatingip: True + use_floatingip: False # Placement hint # Availability zone to use for servers in the server cloud @@ -66,7 +72,16 @@ server: # CLIENT SIDE CONFIG OPTIONS client: # Assign floating IP for every VM - use_floatingip: True + use_floatingip: False + + # Flavor to use for the test images + flavor: + # Number of vCPUs for the flavor + vcpus: 1 + # Memory for the flavor in MB + ram: 2048 + # Size of local disk in GB + disk: 20 # Placement hint # Availability zone to use for clients in the client cloud diff --git a/scale/cfg.topo.yaml b/scale/cfg.topo.yaml new file mode 100644 index 0000000..a5aa24d --- /dev/null +++ b/scale/cfg.topo.yaml @@ -0,0 +1,7 @@ +# Compute host topology file for running KloudBuster + +servers_rack: + hh23-5 + +clients_rack: + hh23-6 diff --git a/scale/dib/build-image.sh b/scale/dib/build-image.sh old mode 100644 new mode 100755 diff --git a/scale/dib/elements/kloudbuster/post-install.d/01-kb-script b/scale/dib/elements/kloudbuster/post-install.d/01-kb-script index 813723b..0093540 100755 --- a/scale/dib/elements/kloudbuster/post-install.d/01-kb-script +++ b/scale/dib/elements/kloudbuster/post-install.d/01-kb-script @@ -37,19 +37,7 @@ update-rc.d -f nginx remove sed -i "s/127.0.0.1/0.0.0.0/g" /etc/redis/redis.conf # if started nginx should be allowed to open more file descriptors -sed -i 's/start-stop-daemon\ --start/ulimit\ \-n\ 102400\n\ \ \ \ \0/g' /etc/init.d/nginx - -# copy the agent python script -# the file is in the same directory as ./dib/elements, so need to go up one level -( - IFS=: - for p in $ELEMENTS_PATH; do - if [ -f $p/../../kb_vm_agent.py ]; then - cp $p/../../kb_vm_agent.py /var/tmp - break - fi - done -) +sed -i 's/start-stop-daemon\ --start/ulimit\ \-n\ 102400\n\t\0/g' /etc/init.d/nginx # ====== # Client @@ -67,9 +55,8 @@ cd .. rm -rf wrk2 # uninstall unneeded packages -apt-get -y remove git -apt-get -y remove python-pip -apt-get -y remove build-essential +apt-get -y --purge remove git +apt-get -y --purge remove python-pip +apt-get -y --purge remove build-essential apt-get -y autoremove apt-get -y autoclean - diff --git a/scale/tools/kb.lua b/scale/dib/elements/kloudbuster/static/kb_test/kb.lua similarity index 100% rename from scale/tools/kb.lua rename to scale/dib/elements/kloudbuster/static/kb_test/kb.lua diff --git a/scale/dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py b/scale/dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py new file mode 100644 index 0000000..b2ab6fb --- /dev/null +++ b/scale/dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py @@ -0,0 +1,239 @@ +# Copyright 2015 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import socket +import subprocess +import sys +import threading +import time + +import redis + +class KB_Instance(object): + + # Check whether the HTTP Service is up running + @staticmethod + def check_http_service(target_url): + cmd = 'while true; do\n' + cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url) + cmd += 'if [ $? -eq 0 ]; then break; fi\n' + cmd += 'done' + return cmd + + # Add static route + @staticmethod + def add_static_route(network, next_hop_ip, if_name=None): + debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) + cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) + if if_name: + debug_msg += " and %s" % if_name + cmd += " dev %s" % if_name + # TODO(Logging on Agent) + print debug_msg + return cmd + + # Get static route + @staticmethod + def get_static_route(network, next_hop_ip=None, if_name=None): + cmd = "ip route show %s" % network + if next_hop_ip: + cmd += " via %s" % next_hop_ip + if if_name: + cmd += " dev %s" % if_name + return cmd + + # Delete static route + @staticmethod + def delete_static_route(network, next_hop_ip=None, if_name=None): + debug_msg = "Deleting static route %s" % network + cmd = "sudo ip route del %s" % network + if next_hop_ip: + debug_msg = " with next hop %s" % next_hop_ip + cmd += " via %s" % next_hop_ip + if if_name: + if next_hop_ip: + debug_msg = " and %s" % if_name + else: + debug_msg = "with next hop %s" % if_name + cmd += " dev %s" % if_name + # TODO(Logging on Agent) + print debug_msg + return cmd + + # Run the HTTP benchmarking tool + @staticmethod + def run_http_test(dest_path, target_url, threads, connections, + rate_limit, duration, timeout, connection_type): + if not rate_limit: + rate_limit = 65535 + cmd = '%s -t%d -c%d -R%d -d%ds --timeout %ds --latency --s kb.lua %s' % \ + (dest_path, threads, connections, rate_limit, duration, timeout, target_url) + return cmd + + +class KB_VM_Agent(object): + + def __init__(self, user_data): + host = user_data['redis_server'] + port = user_data['redis_server_port'] + self.user_data = user_data + self.redis_obj = redis.StrictRedis(host=host, port=port) + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.hello_thread = None + self.stop_hello = threading.Event() + # Assumption: + # Here we assume the vm_name is the same as the host name (lower case), + # which is true if the VM is spawned by Kloud Buster. + self.vm_name = socket.gethostname().lower() + self.orches_chan_name = "kloudbuster_orches" + self.report_chan_name = "kloudbuster_report" + self.last_cmd = None + + def setup_channels(self): + # Check for connections to redis server + while (True): + try: + self.redis_obj.get("test") + except (redis.exceptions.ConnectionError): + time.sleep(1) + continue + break + + # Subscribe to orchestration channel + self.pubsub.subscribe(self.orches_chan_name) + + def report(self, cmd, client_type, data): + message = {'cmd': cmd, 'sender-id': self.vm_name, + 'client-type': client_type, 'data': data} + self.redis_obj.publish(self.report_chan_name, message) + + def send_hello(self): + # Sending "hello" message to master node every 2 seconds + while not self.stop_hello.is_set(): + self.report('READY', None, None) + time.sleep(2) + + def exec_command(self, cmd): + # Execute the command, and returns the outputs + cmds = ['bash', '-c'] + cmds.append(cmd) + p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate() + + return (p.returncode, stdout, stderr) + + def process_cmd(self, message): + if message['cmd'] == 'ACK': + # When 'ACK' is received, means the master node + # acknowledged the current VM. So stopped sending more + # "hello" packet to the master node. + # Unfortunately, there is no thread.stop() in Python 2.x + self.stop_hello.set() + elif message['cmd'] == 'EXEC': + self.last_cmd = "" + try: + cmd_res_tuple = eval('self.exec_' + message['data']['cmd'] + '()') + cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple)) + except Exception as exc: + cmd_res_dict = { + "status": 1, + "stdout": self.last_cmd, + "stderr": str(exc) + } + self.report('DONE', message['client-type'], cmd_res_dict) + elif message['cmd'] == 'ABORT': + # TODO(Add support to abort a session) + pass + else: + # Unexpected + # TODO(Logging on Agent) + print 'ERROR: Unexpected command received!' + pass + + def work(self): + for item in self.pubsub.listen(): + if item['type'] != 'message': + continue + # Convert the string representation of dict to real dict obj + message = eval(item['data']) + self.process_cmd(message) + + def exec_setup_static_route(self): + self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip']) + result = self.exec_command(self.last_cmd) + if (self.user_data['target_subnet_ip'] not in result[1]): + self.last_cmd = KB_Instance.add_static_route( + self.user_data['target_subnet_ip'], + self.user_data['target_shared_interface_ip']) + return self.exec_command(self.last_cmd) + else: + return (0, '', '') + + def exec_check_http_service(self): + self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url']) + return self.exec_command(self.last_cmd) + + def exec_run_http_test(self): + self.last_cmd = KB_Instance.run_http_test( + dest_path=self.user_data['http_tool']['dest_path'], + target_url=self.user_data['target_url'], + **self.user_data['http_tool_configs']) + return self.exec_command(self.last_cmd) + +def exec_command(cmd): + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate() + + return p.returncode + +def start_redis_server(): + cmd = ['sudo', 'service', 'redis-server', 'start'] + return exec_command(cmd) + +def start_nuttcp_server(): + cmd = ['/var/tmp/nuttcp-7.3.2', '-P5002', '-S', '--single-threaded'] + return exec_command(cmd) + +def start_nginx_server(): + cmd = ['sudo', 'service', 'nginx', 'start'] + return exec_command(cmd) + +if __name__ == "__main__": + try: + f = open('/var/tmp/user-data', 'r') + user_data = eval(f.read()) + except Exception as e: + # TODO(Logging on Agent) + print e.message + sys.exit(1) + + if 'role' not in user_data: + sys.exit(1) + + if user_data['role'] == 'KB-PROXY': + sys.exit(start_redis_server()) + if user_data['role'] == 'Server': + rc1 = start_nuttcp_server() + rc2 = start_nginx_server() + sys.exit(rc1 or rc2) + elif user_data['role'] == 'Client': + agent = KB_VM_Agent(user_data) + agent.setup_channels() + agent.hello_thread = threading.Thread(target=agent.send_hello) + agent.hello_thread.daemon = True + agent.hello_thread.start() + agent.work() + else: + sys.exit(1) diff --git a/scale/kb.lua b/scale/kb.lua deleted file mode 100644 index 9ceedc6..0000000 --- a/scale/kb.lua +++ /dev/null @@ -1,11 +0,0 @@ --- example reporting script which demonstrates a custom --- done() function that prints latency percentiles as CSV - -done = function(summary, latency, requests) - io.write("__START_KLOUDBUSTER_DATA__\n") - for _, p in pairs({ 50, 75, 90, 99, 99.9, 99.99, 99.999 }) do - n = latency:percentile(p) - io.write(string.format("%g,%d\n", p, n)) - end - io.write("__END_KLOUDBUSTER_DATA__\n") -end diff --git a/scale/kb.lua b/scale/kb.lua new file mode 120000 index 0000000..7ca92cc --- /dev/null +++ b/scale/kb.lua @@ -0,0 +1 @@ +dib/elements/kloudbuster/static/kb_test/kb.lua \ No newline at end of file diff --git a/scale/kb_runner.py b/scale/kb_runner.py new file mode 100644 index 0000000..bd9512e --- /dev/null +++ b/scale/kb_runner.py @@ -0,0 +1,234 @@ +# Copyright 2015 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import log as logging +import redis + +LOG = logging.getLogger(__name__) + +class KBVMUpException(Exception): + pass + +class KBSetStaticRouteException(Exception): + pass + +class KBHTTPServerUpException(Exception): + pass + +class KBHTTPBenchException(Exception): + pass + +class KBProxyConnectionException(Exception): + pass + +class KBRunner(object): + """ + Control the testing VMs on the testing cloud + """ + + def __init__(self, client_list, config, single_cloud=True): + self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list)) + self.config = config + self.single_cloud = single_cloud + self.result = {} + self.host_stats = {} + self.tool_result = {} + + # Redis + self.redis_obj = None + self.pubsub = None + self.orches_chan_name = "kloudbuster_orches" + self.report_chan_name = "kloudbuster_report" + + def setup_redis(self, redis_server, redis_server_port=6379, timeout=120): + LOG.info("Setting up redis connection pool...") + # For now, the redis server is not in the scope of Kloud Buster, which has to be + # pre-configured before executing Kloud Buster. + connection_pool = redis.ConnectionPool( + host=redis_server, port=redis_server_port, db=0) + + LOG.info("Setting up the redis connections...") + self.redis_obj = redis.StrictRedis(connection_pool=connection_pool, + socket_connect_timeout=1, + socket_timeout=1) + success = False + retry_count = max(timeout / self.config.polling_interval, 1) + # Check for connections to redis server + for retry in xrange(retry_count): + try: + self.redis_obj.get("test") + success = True + except (redis.exceptions.ConnectionError): + LOG.info("Connecting to redis server... Retry #%d/%d", retry, retry_count) + time.sleep(self.config.polling_interval) + continue + break + if not success: + LOG.error("Error: Cannot connect to the Redis server") + raise KBProxyConnectionException() + + # Subscribe to message channel + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.pubsub.subscribe(self.report_chan_name) + + def dispose(self): + if self.pubsub: + self.pubsub.unsubscribe() + self.pubsub.close() + + def send_cmd(self, cmd, client_type, data): + message = {'cmd': cmd, 'sender-id': 'kb-master', + 'client-type': client_type, 'data': data} + LOG.kbdebug(message) + self.redis_obj.publish(self.orches_chan_name, message) + + def polling_vms(self, timeout, polling_interval=None): + ''' + Polling all VMs for the status of execution + Guarantee to run once if the timeout is less than polling_interval + ''' + if not polling_interval: + polling_interval = self.config.polling_interval + retry_count = max(timeout / polling_interval, 1) + retry = cnt_succ = cnt_failed = 0 + clist = self.client_dict.copy() + + while (retry < retry_count and len(clist)): + time.sleep(polling_interval) + while True: + msg = self.pubsub.get_message() + if not msg: + # No new message, commands are in executing + break + + LOG.kbdebug(msg) + payload = eval(msg['data']) + vm_name = payload['sender-id'] + instance = self.client_dict[vm_name] + cmd = payload['cmd'] + if cmd == 'READY': + # If a READY packet is received, the corresponding VM is up + # running. We mark the flag for that VM, and skip all READY + # messages received afterwards. + if instance.up_flag: + continue + else: + clist[vm_name].up_flag = True + clist.pop(vm_name) + cnt_succ = cnt_succ + 1 + elif cmd == 'DONE': + self.result[vm_name] = payload['data'] + clist.pop(vm_name) + if self.result[vm_name]['status']: + # Command returned with non-zero status, command failed + LOG.error("[%s] %s", vm_name, self.result[vm_name]['stderr']) + cnt_failed = cnt_failed + 1 + else: + # Command returned with zero, command succeed + cnt_succ = cnt_succ + 1 + elif cmd == 'DEBUG': + LOG.info('[%s] %s' + (vm_name, payload['data'])) + else: + LOG.error('[%s] received invalid command: %s' + (vm_name, cmd)) + + + LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" % + (cnt_succ, cnt_failed, len(clist), retry)) + retry = retry + 1 + + return (cnt_succ, cnt_failed, len(clist)) + + def wait_for_vm_up(self, timeout=300): + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_dict): + raise KBVMUpException() + self.send_cmd('ACK', None, None) + + def setup_static_route(self, timeout=10): + func = {'cmd': 'setup_static_route'} + self.send_cmd('EXEC', 'http', func) + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_dict): + raise KBSetStaticRouteException() + + def check_http_service(self, timeout=30): + func = {'cmd': 'check_http_service'} + self.send_cmd('EXEC', 'http', func) + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_dict): + raise KBHTTPServerUpException() + + def run_http_test(self): + func = {'cmd': 'run_http_test'} + self.send_cmd('EXEC', 'http', func) + # Give additional 30 seconds for everybody to report results + timeout = self.config.http_tool_configs.duration + 30 + cnt_pending = self.polling_vms(timeout)[2] + if cnt_pending != 0: + LOG.warn("Testing VMs are not returning results within grace period, " + "summary shown below may not be accurate!") + + # Parse the results from HTTP Tools + for key, instance in self.client_dict.items(): + self.result[key] = instance.http_client_parser(**self.result[key]) + + def gen_host_stats(self): + for vm in self.result.keys(): + phy_host = self.client_dict[vm].host + if phy_host not in self.host_stats: + self.host_stats[phy_host] = [] + self.host_stats[phy_host].append(self.result[vm]) + + http_tool = self.client_dict.values()[0].http_tool + for phy_host in self.host_stats: + self.host_stats[phy_host] = http_tool.consolidate_results(self.host_stats[phy_host]) + + def run(self): + try: + LOG.info("Waiting for agents on VMs to come up...") + self.wait_for_vm_up() + + if self.single_cloud: + LOG.info("Setting up static route to reach tested cloud...") + self.setup_static_route() + + LOG.info("Waiting for HTTP service to come up...") + self.check_http_service() + + if self.config.prompt_before_run: + print "Press enter to start running benchmarking tools..." + raw_input() + + LOG.info("Starting HTTP Benchmarking...") + self.run_http_test() + + # Call the method in corresponding tools to consolidate results + http_tool = self.client_dict.values()[0].http_tool + LOG.kbdebug(self.result.values()) + self.tool_result = http_tool.consolidate_results(self.result.values()) + self.tool_result['http_rate_limit'] = self.config.http_tool_configs.rate_limit + self.tool_result['total_connections'] =\ + len(self.client_dict) * self.config.http_tool_configs.connections + self.gen_host_stats() + except (KBSetStaticRouteException): + LOG.error("Could not set static route.") + return + except (KBHTTPServerUpException): + LOG.error("HTTP service is not up in testing cloud.") + return + except KBHTTPBenchException(): + LOG.error("Error in HTTP benchmarking.") + return diff --git a/scale/kb_scheduler.py b/scale/kb_scheduler.py index 860e746..7d09ae0 100644 --- a/scale/kb_scheduler.py +++ b/scale/kb_scheduler.py @@ -12,210 +12,56 @@ # License for the specific language governing permissions and limitations # under the License. -import time - import log as logging -import redis LOG = logging.getLogger(__name__) -class KBVMUpException(Exception): +class KBVMMappingAlgoNotSup(Exception): pass -class KBSetStaticRouteException(Exception): - pass - -class KBHTTPServerUpException(Exception): - pass - -class KBHTTPBenchException(Exception): - pass - -class KBProxyConnectionException(Exception): +class KBVMPlacementAlgoNotSup(Exception): pass class KBScheduler(object): """ - Control the testing VMs on the testing cloud + 1. VM Placements + 2. Mapping client VMs to target servers """ - def __init__(self, client_list, config, single_cloud=True): - self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list)) - self.config = config - self.single_cloud = single_cloud - self.result = {} - self.tool_result = {} - - # Redis - self.redis_obj = None - self.pubsub = None - self.orches_chan_name = "kloudbuster_orches" - self.report_chan_name = "kloudbuster_report" - - def setup_redis(self, redis_server, redis_server_port=6379, timeout=120): - LOG.info("Setting up redis connection pool...") - # For now, the redis server is not in the scope of Kloud Buster, which has to be - # pre-configured before executing Kloud Buster. - connection_pool = redis.ConnectionPool( - host=redis_server, port=redis_server_port, db=0) - - LOG.info("Setting up the redis connections...") - self.redis_obj = redis.StrictRedis(connection_pool=connection_pool, - socket_connect_timeout=1) - success = False - retry_count = max(timeout / self.config.polling_interval, 1) - # Check for connections to redis server - for retry in xrange(retry_count): - try: - self.redis_obj.get("test") - success = True - except (redis.exceptions.ConnectionError): - LOG.info("Connecting to redis server... Retry #%d/%d", retry, retry_count) - time.sleep(self.config.polling_interval) - continue - break - if not success: - LOG.error("Error: Cannot connect to the Redis server") - raise KBProxyConnectionException() - - # Subscribe to message channel - self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) - self.pubsub.subscribe(self.report_chan_name) - - def dispose(self): - if self.pubsub: - self.pubsub.unsubscribe() - self.pubsub.close() - - def send_cmd(self, cmd, client_type, data): - message = {'cmd': cmd, 'sender-id': 'kb-master', - 'client-type': client_type, 'data': data} - LOG.kbdebug(message) - self.redis_obj.publish(self.orches_chan_name, message) - - def polling_vms(self, timeout, polling_interval=None): - ''' - Polling all VMs for the status of execution - Guarantee to run once if the timeout is less than polling_interval - ''' - if not polling_interval: - polling_interval = self.config.polling_interval - retry_count = max(timeout / polling_interval, 1) - retry = cnt_succ = cnt_failed = 0 - clist = self.client_dict.copy() - - while (retry < retry_count and len(clist)): - time.sleep(polling_interval) - while True: - msg = self.pubsub.get_message() - if not msg: - # No new message, commands are in executing - break - - LOG.kbdebug(msg) - payload = eval(msg['data']) - vm_name = payload['sender-id'] - instance = self.client_dict[vm_name] - cmd = payload['cmd'] - if cmd == 'READY': - # If a READY packet is received, the corresponding VM is up - # running. We mark the flag for that VM, and skip all READY - # messages received afterwards. - if instance.up_flag: - continue - else: - clist[vm_name].up_flag = True - clist.pop(vm_name) - cnt_succ = cnt_succ + 1 - elif cmd == 'DONE': - self.result[vm_name] = payload['data'] - clist.pop(vm_name) - if self.result[vm_name]['status']: - # Command returned with non-zero status, command failed - LOG.error("[%s] %s", vm_name, self.result[vm_name]['stderr']) - cnt_failed = cnt_failed + 1 - else: - # Command returned with zero, command succeed - cnt_succ = cnt_succ + 1 - elif cmd == 'DEBUG': - LOG.info('[%s] %s' + (vm_name, payload['data'])) - else: - LOG.error('[%s] received invalid command: %s' + (vm_name, cmd)) - - - LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" % - (cnt_succ, cnt_failed, len(clist), retry)) - retry = retry + 1 - - return (cnt_succ, cnt_failed, len(clist)) - - def wait_for_vm_up(self, timeout=300): - cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_dict): - raise KBVMUpException() - self.send_cmd('ACK', None, None) - - def setup_static_route(self, timeout=10): - func = {'cmd': 'setup_static_route'} - self.send_cmd('EXEC', 'http', func) - cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_dict): - raise KBSetStaticRouteException() - - def check_http_service(self, timeout=30): - func = {'cmd': 'check_http_service'} - self.send_cmd('EXEC', 'http', func) - cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_dict): - raise KBHTTPServerUpException() - - def run_http_test(self): - func = {'cmd': 'run_http_test'} - LOG.info(func) - self.send_cmd('EXEC', 'http', func) - # Give additional 30 seconds for everybody to report results - timeout = self.config.http_tool_configs.duration + 30 - cnt_pending = self.polling_vms(timeout)[2] - if cnt_pending != 0: - LOG.warn("Testing VMs are not returning results within grace period, " - "summary shown below may not be accurate!") - - # Parse the results from HTTP Tools - for key, instance in self.client_dict.items(): - self.result[key] = instance.http_client_parser(**self.result[key]) - - def run(self): - try: - LOG.info("Waiting for agents on VMs to come up...") - self.wait_for_vm_up() - - if self.single_cloud: - LOG.info("Setting up static route to reach tested cloud...") - self.setup_static_route() - - LOG.info("Waiting for HTTP service to come up...") - self.check_http_service() - - if self.config.prompt_before_run: - print "Press enter to start running benchmarking tools..." - raw_input() - - LOG.info("Starting HTTP Benchmarking...") - self.run_http_test() - - # Call the method in corresponding tools to consolidate results - http_tool = self.client_dict.values()[0].http_tool - LOG.kbdebug(self.result.values()) - self.tool_result = http_tool.consolidate_results(self.result.values()) - self.tool_result['http_rate_limit'] = self.config.http_tool_configs.rate_limit - self.tool_result['total_connections'] =\ - len(self.client_dict) * self.config.http_tool_configs.connections - except (KBSetStaticRouteException): - LOG.error("Could not set static route.") - return - except (KBHTTPServerUpException): - LOG.error("HTTP service is not up in testing cloud.") - return - except KBHTTPBenchException(): - LOG.error("Error in HTTP benchmarking.") + @staticmethod + def setup_vm_placement(role, vm_list, topology, avail_zone, algorithm): + if not topology: + # Will use nova-scheduler to pick up the hypervisors return + if not avail_zone: + # Default availability zone in NOVA + avail_zone = "nova" + + if role == "Server": + host_list = topology.servers_rack.split() + else: + host_list = topology.clients_rack.split() + host_count = len(host_list) + + if algorithm == "Round-robin": + host_idx = 0 + for ins in vm_list: + ins.boot_info['avail_zone'] = "%s:%s" % (avail_zone, host_list[host_idx]) + host_idx = (host_idx + 1) % host_count + else: + LOG.error("Unsupported algorithm!") + raise KBVMPlacementAlgoNotSup() + + @staticmethod + def setup_vm_mappings(client_list, server_list, algorithm): + # VM Mapping framework/algorithm to mapping clients to servers. + # e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc. + # Here we only support N*1:1, i.e. 1 client VM maps to 1 server VM, total of N pairs. + if algorithm == "1:1": + for idx, ins in enumerate(client_list): + ins.target_url = "http://%s/index.html" %\ + (server_list[idx].fip_ip or server_list[idx].fixed_ip) + ins.user_data['target_url'] = ins.target_url + else: + LOG.error("Unsupported algorithm!") + raise KBVMMappingAlgoNotSup() diff --git a/scale/kb_vm_agent.py b/scale/kb_vm_agent.py deleted file mode 100644 index b2ab6fb..0000000 --- a/scale/kb_vm_agent.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2015 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import socket -import subprocess -import sys -import threading -import time - -import redis - -class KB_Instance(object): - - # Check whether the HTTP Service is up running - @staticmethod - def check_http_service(target_url): - cmd = 'while true; do\n' - cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url) - cmd += 'if [ $? -eq 0 ]; then break; fi\n' - cmd += 'done' - return cmd - - # Add static route - @staticmethod - def add_static_route(network, next_hop_ip, if_name=None): - debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) - cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) - if if_name: - debug_msg += " and %s" % if_name - cmd += " dev %s" % if_name - # TODO(Logging on Agent) - print debug_msg - return cmd - - # Get static route - @staticmethod - def get_static_route(network, next_hop_ip=None, if_name=None): - cmd = "ip route show %s" % network - if next_hop_ip: - cmd += " via %s" % next_hop_ip - if if_name: - cmd += " dev %s" % if_name - return cmd - - # Delete static route - @staticmethod - def delete_static_route(network, next_hop_ip=None, if_name=None): - debug_msg = "Deleting static route %s" % network - cmd = "sudo ip route del %s" % network - if next_hop_ip: - debug_msg = " with next hop %s" % next_hop_ip - cmd += " via %s" % next_hop_ip - if if_name: - if next_hop_ip: - debug_msg = " and %s" % if_name - else: - debug_msg = "with next hop %s" % if_name - cmd += " dev %s" % if_name - # TODO(Logging on Agent) - print debug_msg - return cmd - - # Run the HTTP benchmarking tool - @staticmethod - def run_http_test(dest_path, target_url, threads, connections, - rate_limit, duration, timeout, connection_type): - if not rate_limit: - rate_limit = 65535 - cmd = '%s -t%d -c%d -R%d -d%ds --timeout %ds --latency --s kb.lua %s' % \ - (dest_path, threads, connections, rate_limit, duration, timeout, target_url) - return cmd - - -class KB_VM_Agent(object): - - def __init__(self, user_data): - host = user_data['redis_server'] - port = user_data['redis_server_port'] - self.user_data = user_data - self.redis_obj = redis.StrictRedis(host=host, port=port) - self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) - self.hello_thread = None - self.stop_hello = threading.Event() - # Assumption: - # Here we assume the vm_name is the same as the host name (lower case), - # which is true if the VM is spawned by Kloud Buster. - self.vm_name = socket.gethostname().lower() - self.orches_chan_name = "kloudbuster_orches" - self.report_chan_name = "kloudbuster_report" - self.last_cmd = None - - def setup_channels(self): - # Check for connections to redis server - while (True): - try: - self.redis_obj.get("test") - except (redis.exceptions.ConnectionError): - time.sleep(1) - continue - break - - # Subscribe to orchestration channel - self.pubsub.subscribe(self.orches_chan_name) - - def report(self, cmd, client_type, data): - message = {'cmd': cmd, 'sender-id': self.vm_name, - 'client-type': client_type, 'data': data} - self.redis_obj.publish(self.report_chan_name, message) - - def send_hello(self): - # Sending "hello" message to master node every 2 seconds - while not self.stop_hello.is_set(): - self.report('READY', None, None) - time.sleep(2) - - def exec_command(self, cmd): - # Execute the command, and returns the outputs - cmds = ['bash', '-c'] - cmds.append(cmd) - p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = p.communicate() - - return (p.returncode, stdout, stderr) - - def process_cmd(self, message): - if message['cmd'] == 'ACK': - # When 'ACK' is received, means the master node - # acknowledged the current VM. So stopped sending more - # "hello" packet to the master node. - # Unfortunately, there is no thread.stop() in Python 2.x - self.stop_hello.set() - elif message['cmd'] == 'EXEC': - self.last_cmd = "" - try: - cmd_res_tuple = eval('self.exec_' + message['data']['cmd'] + '()') - cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple)) - except Exception as exc: - cmd_res_dict = { - "status": 1, - "stdout": self.last_cmd, - "stderr": str(exc) - } - self.report('DONE', message['client-type'], cmd_res_dict) - elif message['cmd'] == 'ABORT': - # TODO(Add support to abort a session) - pass - else: - # Unexpected - # TODO(Logging on Agent) - print 'ERROR: Unexpected command received!' - pass - - def work(self): - for item in self.pubsub.listen(): - if item['type'] != 'message': - continue - # Convert the string representation of dict to real dict obj - message = eval(item['data']) - self.process_cmd(message) - - def exec_setup_static_route(self): - self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip']) - result = self.exec_command(self.last_cmd) - if (self.user_data['target_subnet_ip'] not in result[1]): - self.last_cmd = KB_Instance.add_static_route( - self.user_data['target_subnet_ip'], - self.user_data['target_shared_interface_ip']) - return self.exec_command(self.last_cmd) - else: - return (0, '', '') - - def exec_check_http_service(self): - self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url']) - return self.exec_command(self.last_cmd) - - def exec_run_http_test(self): - self.last_cmd = KB_Instance.run_http_test( - dest_path=self.user_data['http_tool']['dest_path'], - target_url=self.user_data['target_url'], - **self.user_data['http_tool_configs']) - return self.exec_command(self.last_cmd) - -def exec_command(cmd): - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = p.communicate() - - return p.returncode - -def start_redis_server(): - cmd = ['sudo', 'service', 'redis-server', 'start'] - return exec_command(cmd) - -def start_nuttcp_server(): - cmd = ['/var/tmp/nuttcp-7.3.2', '-P5002', '-S', '--single-threaded'] - return exec_command(cmd) - -def start_nginx_server(): - cmd = ['sudo', 'service', 'nginx', 'start'] - return exec_command(cmd) - -if __name__ == "__main__": - try: - f = open('/var/tmp/user-data', 'r') - user_data = eval(f.read()) - except Exception as e: - # TODO(Logging on Agent) - print e.message - sys.exit(1) - - if 'role' not in user_data: - sys.exit(1) - - if user_data['role'] == 'KB-PROXY': - sys.exit(start_redis_server()) - if user_data['role'] == 'Server': - rc1 = start_nuttcp_server() - rc2 = start_nginx_server() - sys.exit(rc1 or rc2) - elif user_data['role'] == 'Client': - agent = KB_VM_Agent(user_data) - agent.setup_channels() - agent.hello_thread = threading.Thread(target=agent.send_hello) - agent.hello_thread.daemon = True - agent.hello_thread.start() - agent.work() - else: - sys.exit(1) diff --git a/scale/kb_vm_agent.py b/scale/kb_vm_agent.py new file mode 120000 index 0000000..6e357c2 --- /dev/null +++ b/scale/kb_vm_agent.py @@ -0,0 +1 @@ +dib/elements/kloudbuster/static/kb_test/kb_vm_agent.py \ No newline at end of file diff --git a/scale/kloudbuster.py b/scale/kloudbuster.py index c108a10..57cd6ab 100644 --- a/scale/kloudbuster.py +++ b/scale/kloudbuster.py @@ -19,8 +19,10 @@ import sys import threading import traceback +import base_compute import base_network import configure +from kb_runner import KBRunner from kb_scheduler import KBScheduler from keystoneclient.v2_0 import client as keystoneclient import log as logging @@ -89,7 +91,26 @@ class Kloud(object): self.tenant_list.append(new_tenant) new_tenant.create_resources() + # Create flavors for servers, clients, and kb-proxy nodes + nova_client = self.tenant_list[0].user_list[0].nova_client + flavor_manager = base_compute.Flavor(nova_client) + flavor_dict = self.scale_cfg.flavor + if self.testing_side: + flavor_manager.create_flavor('kb.client', override=True, **flavor_dict) + flavor_manager.create_flavor('kb.proxy', override=True, ram=2048, vcpus=1, disk=20) + else: + flavor_manager.create_flavor('kb.server', override=True, **flavor_dict) + def delete_resources(self): + # Deleting flavors created by KloudBuster + nova_client = self.tenant_list[0].user_list[0].nova_client + flavor_manager = base_compute.Flavor(nova_client) + if self.testing_side: + flavor_manager.delete_flavor('kb.client') + flavor_manager.delete_flavor('kb.proxy') + else: + flavor_manager.delete_flavor('kb.server') + for tnt in self.tenant_list: tnt.delete_resources() @@ -162,7 +183,7 @@ class KloudBuster(object): 4. Networks per router 5. Instances per network """ - def __init__(self, server_cred, client_cred, server_cfg, client_cfg): + def __init__(self, server_cred, client_cred, server_cfg, client_cfg, topology): # List of tenant objects to keep track of all tenants self.tenant_list = [] self.tenant = None @@ -170,6 +191,7 @@ class KloudBuster(object): self.tenant_testing = None self.server_cfg = server_cfg self.client_cfg = client_cfg + self.topology = topology # TODO(check on same auth_url instead) if server_cred == client_cred: self.single_cloud = True @@ -209,27 +231,27 @@ class KloudBuster(object): LOG.info("Preparing metadata for VMs... (%s)" % role) if role == "Server": svr_list = self.kloud.get_all_instances() + KBScheduler.setup_vm_placement(role, svr_list, self.topology, + self.kloud.placement_az, "Round-robin") for ins in svr_list: ins.user_data['role'] = "Server" + ins.boot_info['flavor_type'] = "kb.server" ins.boot_info['user_data'] = str(ins.user_data) elif role == "Client": - # We supposed to have a mapping framework/algorithm to mapping clients to servers. - # e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc. - # Here we are using N*1:1 client_list = self.testing_kloud.get_all_instances() svr_list = self.kloud.get_all_instances() - + KBScheduler.setup_vm_mappings(client_list, svr_list, "1:1") + KBScheduler.setup_vm_placement(role, client_list, self.topology, + self.testing_kloud.placement_az, "Round-robin") for idx, ins in enumerate(client_list): - ins.target_url = "http://%s/index.html" %\ - (svr_list[idx].fip_ip or svr_list[idx].fixed_ip) ins.user_data['role'] = "Client" ins.user_data['redis_server'] = self.kb_proxy.fixed_ip ins.user_data['redis_server_port'] = 6379 ins.user_data['target_subnet_ip'] = svr_list[idx].subnet_ip ins.user_data['target_shared_interface_ip'] = svr_list[idx].shared_interface_ip - ins.user_data['target_url'] = ins.target_url ins.user_data['http_tool'] = ins.config['http_tool'] ins.user_data['http_tool_configs'] = ins.config['http_tool_configs'] + ins.boot_info['flavor_type'] = "kb.client" ins.boot_info['user_data'] = str(ins.user_data) def run(self): @@ -238,13 +260,13 @@ class KloudBuster(object): Executes tests serially Support concurrency in fututure """ - kbscheduler = None + kbrunner = None vm_creation_concurrency = self.client_cfg.vm_creation_concurrency try: self.kloud.create_resources() self.testing_kloud.create_resources() - # Start the scheduler and ready for the incoming redis messages + # Start the runner and ready for the incoming redis messages client_list = self.testing_kloud.get_all_instances() server_list = self.kloud.get_all_instances() @@ -254,12 +276,15 @@ class KloudBuster(object): self.kb_proxy.vm_name = "KB-PROXY" self.kb_proxy.user_data['role'] = 'KB-PROXY' - self.kb_proxy.boot_info['flavor_type'] = 'm1.small' + self.kb_proxy.boot_info['flavor_type'] = 'kb.proxy' + if self.testing_kloud.placement_az: + self.kb_proxy.boot_info['avail_zone'] = "%s:%s" %\ + (self.testing_kloud.placement_az, self.topology.clients_rack.split()[0]) self.kb_proxy.boot_info['user_data'] = str(self.kb_proxy.user_data) self.testing_kloud.create_vm(self.kb_proxy) - kbscheduler = KBScheduler(client_list, self.client_cfg, self.single_cloud) - kbscheduler.setup_redis(self.kb_proxy.fip_ip) + kbrunner = KBRunner(client_list, self.client_cfg, self.single_cloud) + kbrunner.setup_redis(self.kb_proxy.fip_ip) if self.single_cloud: # Find the shared network if the cloud used to testing is same @@ -292,11 +317,12 @@ class KloudBuster(object): # Function that print all the provisioning info self.print_provision_info() - # Run the scheduler to perform benchmarkings - kbscheduler.run() - self.final_result = kbscheduler.tool_result + # Run the runner to perform benchmarkings + kbrunner.run() + self.final_result = kbrunner.tool_result self.final_result['total_server_vms'] = len(server_list) self.final_result['total_client_vms'] = len(client_list) + # self.final_result['host_stats'] = kbrunner.host_stats LOG.info(self.final_result) except KeyboardInterrupt: traceback.format_exc() @@ -315,8 +341,8 @@ class KloudBuster(object): self.testing_kloud.delete_resources() except Exception: traceback.print_exc() - if kbscheduler: - kbscheduler.dispose() + if kbrunner: + kbrunner.dispose() def get_total_vm_count(config): return (config['number_tenants'] * config['users_per_tenant'] * @@ -355,6 +381,10 @@ if __name__ == '__main__': short="c", default=None, help="Override default values with a config file"), + cfg.StrOpt("topology", + short="t", + default=None, + help="Topology files for compute hosts"), cfg.StrOpt("tested-rc", default=None, help="Tested cloud openrc credentials file"), @@ -386,6 +416,11 @@ if __name__ == '__main__': alt_config = configure.Configuration.from_file(CONF.config).configure() config_scale = config_scale.merge(alt_config) + if CONF.topology: + topology = configure.Configuration.from_file(CONF.topology).configure() + else: + topology = None + # Retrieve the credentials cred = credentials.Credentials(CONF.tested_rc, CONF.passwd_tested, CONF.no_env) if CONF.testing_rc and CONF.testing_rc != CONF.tested_rc: @@ -427,7 +462,7 @@ if __name__ == '__main__': # The KloudBuster class is just a wrapper class # levarages tenant and user class for resource creations and # deletion - kloudbuster = KloudBuster(cred, cred_testing, server_side_cfg, client_side_cfg) + kloudbuster = KloudBuster(cred, cred_testing, server_side_cfg, client_side_cfg, topology) kloudbuster.run() if CONF.json: diff --git a/scale/tools/nuttcp-7.3.2 b/scale/tools/nuttcp-7.3.2 deleted file mode 100755 index 607acca..0000000 Binary files a/scale/tools/nuttcp-7.3.2 and /dev/null differ diff --git a/scale/tools/wrk-4.0.1 b/scale/tools/wrk-4.0.1 deleted file mode 100755 index a10aeee..0000000 Binary files a/scale/tools/wrk-4.0.1 and /dev/null differ diff --git a/scale/tools/wrk2-3.1.1 b/scale/tools/wrk2-3.1.1 deleted file mode 100755 index b9ca792..0000000 Binary files a/scale/tools/wrk2-3.1.1 and /dev/null differ