From 8a9dda8f7d6403836bca2e45be00b28ea6b58f21 Mon Sep 17 00:00:00 2001 From: Yichen Wang Date: Thu, 16 Apr 2015 15:19:51 -0700 Subject: [PATCH] Use redis as the communication protocol 1. Supported to use redis as the orchestration and reporting protocol; 2. Implemented the agent to run on every testing VM; 3. Modified the HTTP tools to use redis messaging APIs; 4. Added APIs to display provision details for klouds; 5. Added the option to allow a prompt before running tools; 6. Cleanup unneeded code; Change-Id: I221ebbeb8a3001374699617dbd827de269419dab --- requirements.txt | 2 + scale/base_compute.py | 24 ++-- scale/base_network.py | 37 +++--- scale/cfg.scale.nimbus.yaml | 6 - scale/cfg.scale.yaml | 56 +++++++--- scale/kb_scheduler.py | 217 +++++++++++++++++++----------------- scale/kb_vm_agent.py | 102 +++++++++++++++++ scale/kloudbuster.py | 29 +++-- scale/log.py | 3 +- scale/perf_instance.py | 216 +++++++++++++++++------------------ scale/perf_tool.py | 36 ++---- scale/tenant.py | 4 +- scale/users.py | 8 +- scale/wrk_tool.py | 53 +++------ 14 files changed, 448 insertions(+), 345 deletions(-) delete mode 100644 scale/cfg.scale.nimbus.yaml create mode 100644 scale/kb_vm_agent.py diff --git a/requirements.txt b/requirements.txt index 92258e0..7108fa5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ ecdsa>=0.11 jsonpatch>=1.9 jsonschema>=2.4.0 lxml>=3.4.0 +oslo.log>=1.0.0 oslo.utils>=1.2.0 paramiko>=1.14.0 pycrypto>=2.6.1 @@ -19,5 +20,6 @@ python-neutronclient<3,>=2.3.6 python-novaclient>=2.18.1 python-openstackclient>=0.4.1 python-keystoneclient>=1.0.0 +redis>=2.10.3 scp>=0.8.0 tabulate>=0.7.3 diff --git a/scale/base_compute.py b/scale/base_compute.py index 40d156f..a7dd321 100644 --- a/scale/base_compute.py +++ b/scale/base_compute.py @@ -27,10 +27,10 @@ class BaseCompute(object): """ - def __init__(self, nova_client, user_name): + def __init__(self, vm_name, nova_client, user_name): self.novaclient = nova_client self.user_name = user_name - self.vm_name = None + self.vm_name = vm_name self.instance = None self.fip = None self.fip_ip = None @@ -43,11 +43,9 @@ class BaseCompute(object): # Create a server instance with associated # security group, keypair with a provided public key - def create_server(self, vmname, image_name, flavor_type, keyname, - nic, sec_group, public_key_file, - avail_zone=None, user_data=None, - config_drive=None, - retry_count=100): + def create_server(self, image_name, flavor_type, keyname, + nic, sec_group, avail_zone=None, user_data=None, + config_drive=None, retry_count=100): """ Create a VM instance given following parameters 1. VM Name @@ -59,12 +57,11 @@ class BaseCompute(object): """ # Get the image id and flavor id from their logical names - self.vm_name = vmname image = self.find_image(image_name) flavor_type = self.find_flavor(flavor_type) # Also attach the created security group for the test - instance = self.novaclient.servers.create(name=vmname, + instance = self.novaclient.servers.create(name=self.vm_name, image=image, flavor=flavor_type, key_name=keyname, @@ -73,7 +70,7 @@ class BaseCompute(object): userdata=user_data, config_drive=config_drive, security_groups=[sec_group.id]) - flag_exist = self.find_server(vmname, retry_count) + flag_exist = self.find_server(self.vm_name, retry_count) if flag_exist: self.instance = instance @@ -165,10 +162,9 @@ class SecGroup(object): self.novaclient.security_groups.delete(self.secgroup) break except Exception: - LOG.warn("Security group %s in use retry count: %d" % ( - self.secgroup_name, - retry_count)) - time.sleep(4) + LOG.warn("Security group %s in use. Retry #%d" % ( + self.secgroup_name, retry_count)) + time.sleep(2) class KeyPair(object): diff --git a/scale/base_network.py b/scale/base_network.py index faabdce..a4c0258 100644 --- a/scale/base_network.py +++ b/scale/base_network.py @@ -105,14 +105,14 @@ class BaseNetwork(object): for secgroup_count in range(config_scale['secgroups_per_network']): secgroup_instance = base_compute.SecGroup(self.nova_client) self.secgroup_list.append(secgroup_instance) - secgroup_name = network_prefix + "_SG" + str(secgroup_count) + secgroup_name = network_prefix + "-SG" + str(secgroup_count) secgroup_instance.create_secgroup_with_rules(secgroup_name) # Create the keypair list for keypair_count in range(config_scale['keypairs_per_network']): keypair_instance = base_compute.KeyPair(self.nova_client) self.keypair_list.append(keypair_instance) - keypair_name = network_prefix + "_K" + str(keypair_count) + keypair_name = network_prefix + "-K" + str(keypair_count) keypair_instance.add_public_key(keypair_name, config_scale['public_key_file']) # Create the required number of VMs @@ -120,30 +120,34 @@ class BaseNetwork(object): if config_scale['use_floatingip']: external_network = find_external_network(self.neutron_client) LOG.info("Creating Virtual machines for user %s" % self.user_name) + if 'redis_server' in config_scale: + # Here we are creating a testing VM (client), put the redis server + # information in the user_data. + redis_server = config_scale['redis_server'] + redis_server_port = config_scale['redis_server_port'] + user_data = redis_server + ":" + str(redis_server_port) + else: + user_data = None for instance_count in range(config_scale['vms_per_network']): - perf_instance = PerfInstance(self.nova_client, self.user_name) + vm_name = network_prefix + "-I" + str(instance_count) + perf_instance = PerfInstance(vm_name, self.nova_client, self.user_name, config_scale) self.instance_list.append(perf_instance) - vm_name = network_prefix + "_I" + str(instance_count) nic_used = [{'net-id': self.network['id']}] LOG.info("Creating Instance: " + vm_name) - perf_instance.create_server(vm_name, config_scale['image_name'], + perf_instance.create_server(config_scale['image_name'], config_scale['flavor_type'], self.keypair_list[0].keypair_name, nic_used, self.secgroup_list[0].secgroup, - config_scale['public_key_file'], - None, - None, - None) + user_data=user_data) # Store the subnet info and fixed ip address in instance perf_instance.subnet_ip = self.network['subnet_ip'] - LOG.info(perf_instance.instance.networks.values()) - LOG.info("++++++++++++++++++++++++++++++") perf_instance.fixed_ip = perf_instance.instance.networks.values()[0][0] if self.shared_interface_ip: perf_instance.shared_interface_ip = self.shared_interface_ip - # Create the floating ip for the instance store it and the ip address in instance object if config_scale['use_floatingip']: + # Create the floating ip for the instance + # store it and the ip address in instance object perf_instance.fip = create_floating_ip(self.neutron_client, external_network) perf_instance.fip_ip = perf_instance.fip['floatingip']['floating_ip_address'] # Associate the floating ip with this instance @@ -152,11 +156,6 @@ class BaseNetwork(object): else: # Store the fixed ip as ssh ip since there is no floating ip perf_instance.ssh_ip = perf_instance.fixed_ip - LOG.info("VM Information") - LOG.info("SSH IP:%s" % perf_instance.ssh_ip) - LOG.info("Subnet Info: %s" % perf_instance.subnet_ip) - if self.shared_interface_ip: - LOG.info("Shared router interface ip %s" % self.shared_interface_ip) def delete_compute_resources(self): """ @@ -182,7 +181,7 @@ class BaseNetwork(object): """ Create a network with 1 subnet inside it """ - subnet_name = "kloudbuster_subnet" + network_name + subnet_name = "kloudbuster_subnet_" + network_name body = { 'network': { 'name': network_name, @@ -263,7 +262,7 @@ class Router(object): self.shared_interface_ip) self.network_list.append(network_instance) # Create the network and subnet - network_name = self.user_name + "_N" + str(network_count) + network_name = self.user_name + "-N" + str(network_count) network_instance.create_network_and_subnet(network_name) # Attach the created network to router interface self.attach_router_interface(network_instance) diff --git a/scale/cfg.scale.nimbus.yaml b/scale/cfg.scale.nimbus.yaml deleted file mode 100644 index 286362f..0000000 --- a/scale/cfg.scale.nimbus.yaml +++ /dev/null @@ -1,6 +0,0 @@ -# KloudBuster Default configuration file -server: - flavor_type: 'CO2-Large' - -client: - flavor_type: 'CO2-Large' diff --git a/scale/cfg.scale.yaml b/scale/cfg.scale.yaml index 7f0ba4c..3dbe55d 100644 --- a/scale/cfg.scale.yaml +++ b/scale/cfg.scale.yaml @@ -1,7 +1,7 @@ # KloudBuster Default configuration file -server: +server: # Number of tenants to be created on the cloud - number_tenants: 2 + number_tenants: 1 # Number of Users to be created inside the tenant users_per_tenant: 1 @@ -15,23 +15,29 @@ server: routers_per_user: 1 # Number of VM instances to be created within the context of each User - vms_per_network: 1 + vms_per_network: 2 # Number of security groups per network secgroups_per_network: 1 # Number of keypairs per network keypairs_per_network: 1 - + + # Assign floating IP for every VM use_floatingip: False + # SSH configuration + ssh_vm_username: 'ubuntu' + ssh_retry_count: 50 + private_key_file: './ssh/id_rsa' + # Configs that remain constant keystone_admin_role: "admin" - cleanup_resources : True - public_key_file : '../ssh/id_rsa.pub' - image_name : 'Scale Image v3' + cleanup_resources: True + public_key_file: '../ssh/id_rsa.pub' + image_name: 'Scale Image v4' flavor_type: 'm1.small' - + client: # Number of tenants to be created on the cloud number_tenants: 1 @@ -48,7 +54,7 @@ client: routers_per_user: 1 # Number of VM instances to be created within the context of each User - vms_per_network: 2 + vms_per_network: 2 # Number of security groups per network secgroups_per_network: 1 @@ -56,14 +62,36 @@ client: # Number of keypairs per network keypairs_per_network: 1 - use_floatingip: True + # Assign floating IP for every VM + use_floatingip: False # Specify whether the testing cloud is running in same cloud run_on_same_cloud: True - + + # SSH configuration + ssh_vm_username: 'ubuntu' + ssh_retry_count: 50 + private_key_file: './ssh/id_rsa' + + # Redis server configuration + redis_server: '172.29.172.180' + redis_server_port: 6379 + redis_retry_count: 50 + polling_interval: 5 + + # Duration of testing tools (seconds) + exec_time: 30 + + # Tooling + tp_tool: 'nuttcp' + http_tool: 'wrk' + + # Prompt before running benchmarking tools + prompt_before_run: False + # Configs that remain constant keystone_admin_role: "admin" - cleanup_resources : True - public_key_file : '../ssh/id_rsa.pub' - image_name : 'Scale Image v3' + cleanup_resources: True + public_key_file: '../ssh/id_rsa.pub' + image_name: 'Scale Image v4' flavor_type: 'm1.small' diff --git a/scale/kb_scheduler.py b/scale/kb_scheduler.py index 27eac6d..eb38876 100644 --- a/scale/kb_scheduler.py +++ b/scale/kb_scheduler.py @@ -12,128 +12,137 @@ # License for the specific language governing permissions and limitations # under the License. -import threading -import traceback +import time import log as logging - -import sshutils +import redis LOG = logging.getLogger(__name__) +class KBVMUpException(Exception): + pass class KBSetStaticRouteException(Exception): pass +class KBHTTPServerUpException(Exception): + pass + +class KBHTTPBenchException(Exception): + pass + class KBScheduler(object): """ - Control the slave nodes on the testing cloud + Control the testing VMs on the testing cloud """ - """ - The code below are mostly a temporary solution, which assumes all testing - clients have their own floating IP. However, this is usually not ture for - a real use case. + def __init__(self, client_list, config): + self.client_list = client_list + self.config = config + self.result = {} + self.redis_connection_pool = None - Will replace below code. and take advantage of kafka framework - """ - - def __init__(self, kb_master=None): - self.kb_master = kb_master - self.client_status = {} - self.client_result = {} - - def check_server_httpd(self, instance, retry_count=60): + def polling_vms(self, timeout, polling_interval=None): ''' - Check the target server is up running + Polling all VMs for the status of execution + Guarantee to run once if the timeout is less than polling_interval ''' - LOG.info("[%s] Waiting for HTTP Server to come up..." % - instance.vm_name) - cmd = 'curl --head %s --connect-timeout 2' % (instance.target_url) - for retry in range(1, retry_count + 1): - try: - (status, _, _) = instance.exec_command(cmd) - except Exception as e: - traceback.print_exc() - self.client_status[instance.vm_name] = "ERROR: %s" % e.message - return - if not status: - return - LOG.debug("[%s] Waiting for HTTP Server to come up... Retry %d#" % - (instance.vm_name, retry)) + if not polling_interval: + polling_interval = self.config.polling_interval + retry_count = max(timeout / polling_interval, 1) + retry = cnt_succ = cnt_failed = 0 + clist = self.client_list - def setup_static_route(self, instance): - svr = instance.target_server - if not svr.fip_ip: - if svr.subnet_ip not in instance.get_static_route(svr.subnet_ip): - rc = instance.add_static_route(svr.subnet_ip, - svr.shared_interface_ip) - if rc > 0: - LOG.error("Failed to add static route, error code: %i." % - rc) - raise KBSetStaticRouteException() + while (retry < retry_count and len(clist)): + time.sleep(polling_interval) + for instance in clist: + msg = instance.redis_get_message() + if not msg: + # No new message, command in executing + continue + elif msg[0]: + # Command returned with non-zero status, command failed + cnt_failed = cnt_failed + 1 + else: + # Command returned with zero, command succeed + cnt_succ = cnt_succ + 1 + # Current instance finished execution + self.result[instance.vm_name] = msg + clist = [x for x in clist if x != instance] + + LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" % + (cnt_succ, cnt_failed, len(clist), retry)) + retry = retry + 1 + + return (cnt_succ, cnt_failed, len(clist)) + + def wait_for_vm_up(self, timeout=120): + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_list): + raise KBVMUpException() + + def setup_static_route(self, timeout=10): + for instance in self.client_list: + svr = instance.target_server + instance.add_static_route(svr.subnet_ip, svr.shared_interface_ip) + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_list): + raise KBSetStaticRouteException() + + def check_http_server_up(self, timeout=60): + for instance in self.client_list: + instance.check_http_service() + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_list): + raise KBHTTPServerUpException() + + def run_http_test(self): + for instance in self.client_list: + instance.run_http_client(threads=2, connections=5000, timeout=5, + connection_type="Keep-alive") + # Give additional 30 seconds for everybody to report results + timeout = self.config.exec_time + 30 + cnt_succ = self.polling_vms(timeout)[0] + if cnt_succ != len(self.client_list): + raise KBHTTPBenchException() + + def run(self): + LOG.info("Setting up redis connection pool...") + # For now, the redis server is not in the scope of Kloud Buster, which has to be + # pre-configured before executing Kloud Buster. + self.redis_connection_pool = redis.ConnectionPool( + host=self.config.redis_server, port=self.config.redis_server_port, db=0) - def setup_testing_env(self, instance): try: - instance.setup_ssh(instance.fip_ip or instance.fixed_ip, "ubuntu") - self.setup_static_route(instance) - self.check_server_httpd(instance) - except (sshutils.SSHError): - self.client_status[instance.vm_name] = "ERROR: Could not setup SSH Session." - return + LOG.info("Setting up the redis connections...") + for instance in self.client_list: + instance.setup_redis(connection_pool=self.redis_connection_pool) + + LOG.info("Waiting for agents on VMs to come up...") + self.wait_for_vm_up() + + LOG.info("Setting up static route to reach tested cloud...") + self.setup_static_route() + + LOG.info("Waiting for HTTP service to come up...") + self.check_http_server_up() + + if self.config.prompt_before_run: + print "Press enter to start running benchmarking tools..." + raw_input() + + LOG.info("Starting HTTP Benchmarking...") + self.run_http_test() + for key in self.result: + # TODO(Consolidating the data from all VMs) + print "[%s] %s" % (key, self.result[key][1]) + except (KBSetStaticRouteException): - self.client_status[instance.vm_name] = "ERROR: Could not set static route." + LOG.error("Could not set static route.") + return + except (KBHTTPServerUpException): + LOG.error("HTTP service is not up in testing cloud.") + return + except KBHTTPBenchException(): + LOG.error("Error in HTTP benchmarking.") return - - def run_test(self, instance): - try: - self.client_result[instance.vm_name] =\ - instance.run_http_client(threads=2, connections=5000, - timeout=5, connection_type="Keep-alive") - except Exception as e: - traceback.print_exc() - self.client_status[instance.vm_name] = "ERROR: %s" % e.message - - def run(self, client_list): - # Wait for kb_master and all clients to come up - # if not self.check_up_with_sshd(self.kb_master): - # raise - thread_list = [] - error_flag = False - - # Pre-allocate the dictionary - for cur_client in client_list: - self.client_status[cur_client.vm_name] = None - - LOG.info("Setting up the testing environments...") - for cur_client in client_list: - self.client_status[cur_client.vm_name] = "Success" - t = threading.Thread(target=self.setup_testing_env, args=[cur_client]) - thread_list.append(t) - t.start() - for cur_thread in thread_list: - cur_thread.join() - for cur_client in client_list: - vm_name = cur_client.vm_name - if self.client_status[vm_name] != "Success": - error_flag = True - LOG.info("%s: %s" % (vm_name, self.client_status[vm_name])) - if error_flag: - raise - - LOG.info("TEST STARTED") - - thread_list = [] - for cur_client in client_list: - self.client_status[cur_client.vm_name] = "Success" - t = threading.Thread(target=self.run_test, args=[cur_client]) - thread_list.append(t) - t.start() - for cur_thread in thread_list: - cur_thread.join() - for cur_client in client_list: - vm_name = cur_client.vm_name - if self.client_status[vm_name] == "Success": - LOG.info("%s: %s" % (vm_name, self.client_result[vm_name])) - else: - LOG.error("%s: %s" % (vm_name, self.client_status[vm_name])) diff --git a/scale/kb_vm_agent.py b/scale/kb_vm_agent.py new file mode 100644 index 0000000..fb988bc --- /dev/null +++ b/scale/kb_vm_agent.py @@ -0,0 +1,102 @@ +# Copyright 2015 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import socket +import subprocess +import sys +import threading +import time + +import redis + +class KB_VM_Agent(object): + + def __init__(self, host, port=6379): + self.redis_obj = redis.StrictRedis(host=host, port=port) + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.hello_thread = None + self.stop_hello = threading.Event() + # Assumption: + # Here we assume the vm_name is the same as the host name, which is + # true if the VM is spawned by Kloud Buster. + self.vm_name = socket.gethostname().lower() + self.orches_chan_name = self.vm_name.lower() + "_orches" + self.report_chan_name = self.vm_name.lower() + "_report" + + def setup_channels(self): + # Check for connections to redis server + while (True): + try: + self.redis_obj.get("test") + except (redis.exceptions.ConnectionError): + time.sleep(1) + continue + break + + # Subscribe to orchestration channel + self.pubsub.subscribe(self.orches_chan_name) + + def exec_command(self, cmd): + # Execute the command, and returns the outputs + cmds = ['bash', '-c'] + cmds.append(cmd) + p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate() + + return (p.returncode, stdout, stderr) + + def report(self, result): + self.redis_obj.publish(self.report_chan_name, result) + + def process_cmd(self, cmd_data): + cmd_res_tuple = self.exec_command(cmd_data['cmd']) + cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple)) + cmd_res_dict['parser_cb'] = cmd_data['parser_cb'] + self.report(cmd_res_dict) + + def send_hello(self): + # Sending "hello" message to master node every 2 seconds + while not self.stop_hello.is_set(): + self.report("hello") + time.sleep(2) + + def work(self): + for item in self.pubsub.listen(): + if item['type'] != 'message': + continue + if item['data'] == 'iamhere': + # When a "iamhere" packet is received, means the master node + # acknowledged the current VM. So stopped sending more + # "hello" packet to the master node. + # Unfortunately, there is no thread.stop() in Python 2.x + self.stop_hello.set() + continue + # Convert the string representation of dict to real dict obj + cmd_data = eval(item['data']) + self.process_cmd(cmd_data) + +if __name__ == "__main__": + + if (len(sys.argv) <= 1): + print("ERROR: Expecting the redis server address.") + sys.exit(1) + + redis_server, redis_server_port = sys.argv[1].split(':', 1) + agent = KB_VM_Agent(redis_server, redis_server_port) + agent.setup_channels() + agent.hello_thread = threading.Thread(target=agent.send_hello) + agent.hello_thread.daemon = True + agent.hello_thread.start() + agent.work() diff --git a/scale/kloudbuster.py b/scale/kloudbuster.py index 57fb868..fbf042d 100644 --- a/scale/kloudbuster.py +++ b/scale/kloudbuster.py @@ -22,6 +22,7 @@ from keystoneclient.v2_0 import client as keystoneclient import log as logging from novaclient.exceptions import ClientException from oslo_config import cfg +from tabulate import tabulate import tenant import credentials @@ -73,7 +74,7 @@ class Kloud(object): def create_resources(self, shared_net=None): self.shared_network = shared_net for tenant_count in xrange(self.scale_cfg['number_tenants']): - tenant_name = self.prefix + "_T" + str(tenant_count) + tenant_name = self.prefix + "-T" + str(tenant_count) new_tenant = tenant.Tenant(tenant_name, self) self.tenant_list.append(new_tenant) new_tenant.create_resources() @@ -108,7 +109,7 @@ class KloudBuster(object): self.tenant = None self.tenant_list_testing = [] self.tenant_testing = None - # to do : check on same auth_url instead + # TODO(check on same auth_url instead) if cred == testing_cred: self.single_cloud = True else: @@ -116,15 +117,27 @@ class KloudBuster(object): self.kloud = Kloud(config_scale.server, cred) self.testing_kloud = Kloud(config_scale.client, testing_cred, testing_side=True) - def print_vms_info(self, role): - pass - def print_provision_info(self): """ Function that iterates and prints all VM info for tested and testing cloud """ - pass + table = [["VM Name", "Internal IP", "Floating IP", "Subnet", "Shared Interface IP"]] + client_list = self.kloud.get_all_instances() + for instance in client_list: + row = [instance.vm_name, instance.fixed_ip, instance.fip_ip, instance.subnet_ip, + instance.shared_interface_ip] + table.append(row) + LOG.info('Provision Details (Tested Kloud)\n' + + tabulate(table, headers="firstrow", tablefmt="psql")) + + table = [["VM Name", "Internal IP", "Floating IP", "Subnet"]] + client_list = self.testing_kloud.get_all_instances() + for instance in client_list: + row = [instance.vm_name, instance.fixed_ip, instance.fip_ip, instance.subnet_ip] + table.append(row) + LOG.info('Provision Details (Testing Kloud)\n' + + tabulate(table, headers="firstrow", tablefmt="psql")) def run(self): """ @@ -155,8 +168,8 @@ class KloudBuster(object): client_list[idx].target_url = "http://%s/index.html" %\ (svr.fip_ip or svr.fixed_ip) - kbscheduler = kb_scheduler.KBScheduler() - kbscheduler.run(client_list) + kbscheduler = kb_scheduler.KBScheduler(client_list, config_scale.client) + kbscheduler.run() except KeyboardInterrupt: traceback.format_exc() except (sshutils.SSHError, ClientException, Exception): diff --git a/scale/log.py b/scale/log.py index bdf4979..530d955 100644 --- a/scale/log.py +++ b/scale/log.py @@ -28,7 +28,7 @@ CONF = cfg.CONF CONF.register_cli_opts(DEBUG_OPTS) oslogging.register_options(CONF) -logging.KBDEBUG = logging.DEBUG + 1 +logging.KBDEBUG = logging.DEBUG + 5 logging.addLevelName(logging.KBDEBUG, "KBDEBUG") CRITICAL = logging.CRITICAL @@ -41,7 +41,6 @@ KBDEBUG = logging.KBDEBUG WARN = logging.WARN WARNING = logging.WARNING - def setup(product_name, version="unknown"): dbg_color = handlers.ColorHandler.LEVEL_COLORS[logging.DEBUG] handlers.ColorHandler.LEVEL_COLORS[logging.KBDEBUG] = dbg_color diff --git a/scale/perf_instance.py b/scale/perf_instance.py index 1d2582c..9a8d12e 100644 --- a/scale/perf_instance.py +++ b/scale/perf_instance.py @@ -1,4 +1,4 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# Copyright 2015 Cisco Systems, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,53 +14,55 @@ # import os -import re import stat import subprocess +import time import sshutils from base_compute import BaseCompute import log as logging +import redis from wrk_tool import WrkTool LOG = logging.getLogger(__name__) - # An openstack instance (can be a VM or a LXC) class PerfInstance(BaseCompute): - def __init__(self, nova_client, user_name, config=None, is_server=False): - BaseCompute.__init__(self, nova_client, user_name) - - if not config: - # HACK ALERT!!! - # We are expecting to see a valid config, here we just hack - class config: - ssh_vm_username = "ubuntu" - tp_tool = None - http_tool = WrkTool - perf_tool_path = './tools' - private_key_file = './ssh/id_rsa' - ssh_retry_count = 50 - debug = True - time = 30 - vm_bandwidth = None + def __init__(self, vm_name, nova_client, user_name, config, is_server=False): + BaseCompute.__init__(self, vm_name, nova_client, user_name) self.config = config self.internal_ip = None + self.is_server = is_server + + # SSH Configuration self.ssh_ip = None self.ssh_user = config.ssh_vm_username self.ssh = None self.port = None - self.is_server = is_server - if config.tp_tool: - self.tp_tool = config.tp_tool(self, config.perf_tool_path) - else: + # Redis Configuration + self.redis_obj = None + self.pubsub = None + self.up_flag = False + self.orches_chan_name = self.vm_name.lower() + "_orches" + self.report_chan_name = self.vm_name.lower() + "_report" + + if 'tp_tool' not in config: self.tp_tool = None - if config.http_tool: - self.http_tool = config.http_tool(self, config.perf_tool_path) + # elif config.tp_tool.lower() == 'nuttcp': + # self.tp_tool = nuttcp_tool.NuttcpTool + # elif opts.tp_tool.lower() == 'iperf': + # self.tp_tool = iperf_tool.IperfTool + # else: + # self.tp_tool = None + + if 'http_tool' not in config: + self.http_tool = None + elif config.http_tool.lower() == 'wrk': + self.http_tool = WrkTool(self) self.target_server = None self.target_url = None else: @@ -68,6 +70,7 @@ class PerfInstance(BaseCompute): def run_tp_client(self, label, dest_ip, target_instance, mss=None, bandwidth=0, bidirectional=False, az_to=None): + # NOTE: This function will not work, and pending to convert to use redis '''test iperf client using the default TCP window size (tcp window scaling is normally enabled by default so setting explicit window size is not going to help achieve better results) @@ -103,19 +106,19 @@ class PerfInstance(BaseCompute): def run_http_client(self, threads, connections, timeout=5, connection_type="Keep-alive"): # HTTP Performance Measurement - if self.http_tool: - http_tool_res = self.http_tool.run_client(self.target_url, - threads, - connections, - timeout, - connection_type) - res = {'target_url': self.target_url} - if self.internal_ip: - res['ip_from'] = self.internal_ip - res['distro_id'] = self.ssh.distro_id - res['distro_version'] = self.ssh.distro_version - else: - http_tool_res = [] + cmd = self.http_tool.cmd_run_client(self.target_url, + threads, + connections, + timeout, + connection_type) + parser_cb = 'self.run_http_client_parser' + self.redis_exec_command(cmd, parser_cb) + + def run_http_client_parser(self, status, stdout, stderr): + http_tool_res = self.http_tool.cmd_parser_run_client(status, stdout, stderr) + res = {'target_url': self.target_url} + if self.internal_ip: + res['ip_from'] = self.internal_ip # consolidate results for all tools res['results'] = http_tool_res @@ -137,80 +140,83 @@ class PerfInstance(BaseCompute): # Send a command on the ssh session def exec_command(self, cmd, timeout=30): (status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout) - # if status: - # LOG.error("[%s] cmd=%s" % (self.vm_name, cmd)) - # if cmd_output: - # LOG.error("[%s] stdout=%s" % (self.vm_name, cmd_output)) - # if err: - # LOG.error("[%s] stderr=%s" % (self.vm_name, err)) - # LOG.kbdebug("[%s] %s" % (self.vm_name, cmd_output)) return (status, cmd_output, err) - # Ping an IP from this instance - def ping_check(self, target_ip, ping_count, pass_threshold): - return self.ssh.ping_check(target_ip, ping_count, pass_threshold) - - # Given a message size verify if ping without fragmentation works or fails - # Returns True if success - def ping_do_not_fragment(self, msg_size, ip_address): - cmd = "ping -M do -c 1 -s " + str(msg_size) + " " + ip_address - (_, cmd_output, _) = self.exec_command(cmd) - match = re.search('100% packet loss', cmd_output) - if match: - return False + # Setup the redis connectivity + def setup_redis(self, host=None, port=None, connection_pool=None): + if connection_pool: + self.redis_obj = redis.StrictRedis(connection_pool=connection_pool) else: - return True + self.redis_obj = redis.StrictRedis(host=host, port=port) - # Set the interface IP address and mask - def set_interface_ip(self, if_name, ip, mask): - LOG.kbdebug("[%s] Setting interface %s to %s mask %s" % (self.vm_name, - if_name, ip, - mask)) - cmd2apply = "sudo ifconfig %s %s netmask %s" % (if_name, ip, mask) - (rc, _, _) = self.ssh.execute(cmd2apply) - return rc + # Check for connections to redis server + for retry in xrange(1, self.config.redis_retry_count + 1): + try: + self.redis_obj.get("test") + except (redis.exceptions.ConnectionError): + LOG.warn("Connecting to redis server... Retry #%d", retry) + time.sleep(1) + continue + break + # Subscribe to message channel + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.pubsub.subscribe(self.report_chan_name) - # Get an interface IP address (returns None if error) - def get_interface_ip(self, if_name): - LOG.kbdebug("[%s] Getting interface %s IP and mask" % (self.vm_name, - if_name)) - cmd2apply = "ifconfig %s" % (if_name) - (rc, res, _) = self.ssh.execute(cmd2apply) - if rc: + return True + + def redis_get_message(self): + message = self.pubsub.get_message() + while message and message['data'] == 'hello': + # If a "hello" packet is received, the corresponding VM is up + # running. We mark the flag for that VM, and skip all "hello" + # messages received afterwards. + if self.up_flag: + message = self.pubsub.get_message() + else: + self.up_flag = True + self.redis_acknowledge_hello() + return (0, "", "") + if not message: return None - # eth5 Link encap:Ethernet HWaddr 90:e2:ba:40:74:05 - # inet addr:172.29.87.29 Bcast:172.29.87.31 Mask:255.255.255.240 - # inet6 addr: fe80::92e2:baff:fe40:7405/64 Scope:Link - match = re.search(r'inet addr:([\d\.]*) ', res) - if not match: - return None - return match.group(1) - # Set an interface MTU to passed in value - def set_interface_mtu(self, if_name, mtu): - LOG.kbdebug("[%s] Setting interface %s mtu to %d" % (self.vm_name, - if_name, mtu)) - cmd2apply = "sudo ifconfig %s mtu %d" % (if_name, mtu) - (rc, _, _) = self.ssh.execute(cmd2apply) - return rc + LOG.kbdebug(message) + msg_body = eval(message['data']) + status = int(msg_body['status']) + stdout = msg_body['stdout'] + stderr = msg_body['stderr'] + parser_cb = msg_body['parser_cb'] - # Get the MTU of an interface - def get_interface_mtu(self, if_name): - cmd = "cat /sys/class/net/%s/mtu" % (if_name) - (_, cmd_output, _) = self.exec_command(cmd) - return int(cmd_output) + if parser_cb is not None: + stdout = eval("%s(status, stdout, stderr)" % parser_cb) + + return (status, stdout, stderr) + + def redis_acknowledge_hello(self): + self.redis_obj.publish(self.orches_chan_name, "iamhere") + + def redis_exec_command(self, cmd, parser_cb=None, timeout=30): + # TODO(Add timeout support) + msg_body = {'cmd': cmd, 'parser_cb': parser_cb} + LOG.kbdebug(msg_body) + self.redis_obj.publish(self.orches_chan_name, msg_body) + + # Check whether the HTTP Service is up running + def check_http_service(self): + cmd = 'while true; do\n' + cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (self.target_url) + cmd += 'if [ $? -eq 0 ]; then break; fi\n' + cmd += 'done' + self.redis_exec_command(cmd, None) # Add static route def add_static_route(self, network, next_hop_ip, if_name=None): - debug_msg = "[%s] Adding static route %s with next hop %s" % ( - self.vm_name, network, - next_hop_ip) + debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) if if_name: debug_msg += " and %s" % if_name cmd += " dev %s" % if_name LOG.kbdebug(debug_msg) - return self.ssh.execute(cmd)[0] + self.redis_exec_command(cmd, None) # Get static route def get_static_route(self, network, next_hop_ip=None, if_name=None): @@ -219,11 +225,8 @@ class PerfInstance(BaseCompute): cmd += " via %s" % next_hop_ip if if_name: cmd += " dev %s" % if_name - (rc, out, err) = self.ssh.execute(cmd) - if rc: - return err - else: - return out + # TODO(Need to implement a parser_cb instead of passing None) + self.redis_exec_command(cmd, None) # Delete static route def delete_static_route(self, network, next_hop_ip=None, if_name=None): @@ -239,7 +242,7 @@ class PerfInstance(BaseCompute): debug_msg = "with next hop %s" % if_name cmd += " dev %s" % if_name LOG.kbdebug(debug_msg) - return self.ssh.execute(cmd)[0] + self.redis_exec_command(cmd, None) # scp a file from the local host to the instance # Returns True if dest file already exists or scp succeeded @@ -274,14 +277,11 @@ class PerfInstance(BaseCompute): return False return True - def get_cmd_duration(self): - '''Get the duration of the client run - Will normally return the time configured in config.time - ''' - return self.config.time - # Dispose the ssh session def dispose(self): if self.ssh: self.ssh.close() self.ssh = None + if self.redis_obj: + self.pubsub.unsubscribe() + self.pubsub.close() diff --git a/scale/perf_tool.py b/scale/perf_tool.py index 9243143..ae4c3ab 100644 --- a/scale/perf_tool.py +++ b/scale/perf_tool.py @@ -27,12 +27,11 @@ SCP_DEST_DIR = '/var/tmp/' class PerfTool(object): __metaclass__ = abc.ABCMeta - def __init__(self, name, perf_tool_path, instance): + def __init__(self, name, instance): self.name = name self.instance = instance self.dest_path = SCP_DEST_DIR + name self.pid = None - self.perf_tool_path = perf_tool_path # Terminate pid if started def dispose(self): @@ -55,7 +54,7 @@ class PerfTool(object): res['throughput_kbps'] = throughput if protocol is not None: res['protocol'] = protocol - if self.instance.config.vm_bandwidth: + if 'vm_bandwidth' in self.instance.config: res['bandwidth_limit_kbps'] = self.instance.config.vm_bandwidth if lossrate is not None: res['loss_rate'] = lossrate @@ -81,34 +80,13 @@ class PerfTool(object): res['http_err'] = http_err return res - def get_boost_client_cmd(self): - cmd = 'ulimit -n 102400 && ' \ - 'sysctl -w fs.file-max=6553550 && ' \ - 'sysctl -w net.core.wmem_max=8388608 && ' \ - 'sysctl -w net.core.wmem_default=8388608 && ' \ - 'sysctl -w net.core.rmem_max=33554432 && ' \ - 'sysctl -w net.core.rmem_default=33554432 && ' \ - 'sysctl -w net.core.netdev_max_backlog=100000 && ' \ - 'sysctl -w net.ipv4.icmp_ratelimit=0 && ' \ - 'sysctl -w net.ipv4.tcp_tw_recycle=1 && ' \ - 'sysctl -w net.ipv4.tcp_tw_reuse=1 && ' \ - 'sysctl -w net.ipv4.tcp_max_tw_buckets=65536 && ' \ - 'sysctl -w net.ipv4.tcp_fin_timeout=15 && ' \ - 'sysctl -w net.ipv4.tcp_max_syn_backlog=65536 && ' \ - 'sysctl -w net.ipv4.tcp_syncookies=1 && ' \ - 'sysctl -w net.ipv4.neigh.default.gc_thresh1=4096 && ' \ - 'sysctl -w net.ipv4.neigh.default.gc_thresh2=4096 && ' \ - 'sysctl -w net.ipv4.neigh.default.gc_thresh3=4096 && ' \ - 'sysctl -w net.ipv4.conf.all.rp_filter=0 && ' \ - 'sysctl -w net.ipv4.conf.all.arp_filter=0 && ' \ - 'sysctl -w net.ipv4.conf.default.rp_filter=0 && ' \ - 'sysctl -w net.ipv4.conf.default.arp_filter=0 && ' \ - 'sysctl -w net.ipv4.conf.eth0.rp_filter=0 && ' \ - 'sysctl -w net.ipv4.conf.eth0.arp_filter=0' - return cmd + @abc.abstractmethod + def cmd_run_client(**kwargs): + # must be implemented by sub classes + return None @abc.abstractmethod - def run_client(**kwargs): + def cmd_parser_run_client(self, status, stdout, stderr): # must be implemented by sub classes return None diff --git a/scale/tenant.py b/scale/tenant.py index 232329f..718d0d4 100644 --- a/scale/tenant.py +++ b/scale/tenant.py @@ -49,7 +49,7 @@ class Tenant(object): LOG.info("Creating tenant: " + self.tenant_name) self.tenant_object = \ self.kloud.keystone.tenants.create(tenant_name=self.tenant_name, - description="Test tenant", + description="KloudBuster tenant", enabled=True) except keystone_exception.Conflict as exc: # ost likely the entry already exists: @@ -74,7 +74,7 @@ class Tenant(object): # Loop over the required number of users and create resources for user_count in xrange(self.kloud.scale_cfg['users_per_tenant']): - user_name = self.tenant_name + "_U" + str(user_count) + user_name = self.tenant_name + "-U" + str(user_count) user_instance = users.User(user_name, self, self.kloud.scale_cfg['keystone_admin_role']) diff --git a/scale/users.py b/scale/users.py index d381f8c..0ce0a41 100644 --- a/scale/users.py +++ b/scale/users.py @@ -62,7 +62,7 @@ class User(object): LOG.info("Creating user: " + self.user_name) return self.tenant.kloud.keystone.users.create(name=self.user_name, password=self.user_name, - email="test.com", + email="kloudbuster@localhost", tenant_id=self.tenant.tenant_id) def _get_user(self): @@ -132,7 +132,9 @@ class User(object): config_scale = self.tenant.kloud.scale_cfg # Find the external network that routers need to attach to - if config_scale['use_floatingip']: + # if redis_server is configured, we need to attach the router to the + # external network in order to reach the redis_server + if config_scale['use_floatingip'] or 'redis_server' in config_scale: external_network = base_network.find_external_network(self.neutron) else: external_network = None @@ -142,7 +144,7 @@ class User(object): router_instance = base_network.Router(self.neutron, self.nova, self.user_name, self.tenant.kloud.shared_network) self.router_list.append(router_instance) - router_name = self.user_name + "_R" + str(router_count) + router_name = self.user_name + "-R" + str(router_count) # Create the router and also attach it to external network router_instance.create_router(router_name, external_network) # Now create the network resources inside the router diff --git a/scale/wrk_tool.py b/scale/wrk_tool.py index 280821b..931579b 100644 --- a/scale/wrk_tool.py +++ b/scale/wrk_tool.py @@ -1,4 +1,4 @@ -# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# Copyright 2015 Cisco Systems, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -18,47 +18,29 @@ import re import log as logging from perf_tool import PerfTool -import sshutils LOG = logging.getLogger(__name__) class WrkTool(PerfTool): - def __init__(self, instance, perf_tool_path): - PerfTool.__init__(self, 'wrk-4.0.1', perf_tool_path, instance) + def __init__(self, instance): + PerfTool.__init__(self, 'wrk-4.0.1', instance) - def get_server_launch_cmd(self): - '''This requires HTTP server is running already + def cmd_run_client(self, target_url, threads, connections, + timeout=5, connetion_type='Keep-alive', retry_count=10): ''' - return None - - def run_client(self, target_url, threads, connections, - timeout=5, connetion_type='New', retry_count=10): - '''Run the test - :return: list containing one or more dictionary results + Return the command for running the benchmarking tool ''' - - duration_sec = self.instance.get_cmd_duration() - - # boost_cmd = self.get_boost_client_cmd() - # cmd = 'sudo sh -c "' + boost_cmd + ' && exec su $LOGNAME -c \'' + duration_sec = self.instance.config.exec_time cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \ (self.dest_path, threads, connections, duration_sec, timeout, target_url) - # cmd += ' && exit\'"' - - LOG.kbdebug("[%s] Measuring HTTP performance..." % - self.instance.vm_name) LOG.kbdebug("[%s] %s" % (self.instance.vm_name, cmd)) - try: - # force the timeout value with 20 seconds extra for the command to - # complete and do not collect CPU - (_, cmd_out, _) = self.instance.exec_command(cmd, duration_sec + 20) - except sshutils.SSHError as exc: - # Timout or any SSH error - LOG.error("SSH Error: " + str(exc)) - return [self.parse_error(str(exc))] + return cmd + def cmd_parser_run_client(self, status, stdout, stderr): + if status: + return [self.parse_error(stderr)] # Sample Output: # Running 10s test @ http://192.168.1.1/index.html # 8 threads and 5000 connections @@ -75,16 +57,15 @@ class WrkTool(PerfTool): # Non-2xx or 3xx responses: 828 # Requests/sec: 6080.66 # Transfer/sec: 282.53MB - try: total_req_str = r'(\d+)\srequests\sin' - http_total_req = re.search(total_req_str, cmd_out).group(1) + http_total_req = re.search(total_req_str, stdout).group(1) re_str = r'Requests/sec:\s+(\d+\.\d+)' - http_rps = re.search(re_str, cmd_out).group(1) + http_rps = re.search(re_str, stdout).group(1) re_str = r'Transfer/sec:\s+(\d+\.\d+.B)' - http_rates_kbytes = re.search(re_str, cmd_out).group(1) + http_rates_kbytes = re.search(re_str, stdout).group(1) # Uniform in unit MB ex_unit = 'KMG'.find(http_rates_kbytes[-2]) if ex_unit == -1: @@ -93,7 +74,7 @@ class WrkTool(PerfTool): http_rates_kbytes = float(val * (1024 ** (ex_unit))) re_str = r'Socket errors: connect (\d+), read (\d+), write (\d+), timeout (\d+)' - http_sock_err = re.search(re_str, cmd_out) + http_sock_err = re.search(re_str, stdout) if http_sock_err: v1 = int(http_sock_err.group(1)) v2 = int(http_sock_err.group(2)) @@ -104,13 +85,13 @@ class WrkTool(PerfTool): http_sock_err = 0 re_str = r'Non-2xx or 3xx responses: (\d+)' - http_err = re.search(re_str, cmd_out) + http_err = re.search(re_str, stdout) if http_err: http_err = http_err.group(1) else: http_err = 0 except Exception: - return self.parse_error('Could not parse: %s' % (cmd_out)) + return self.parse_error('Could not parse: %s' % (stdout)) return self.parse_results(http_total_req=http_total_req, http_rps=http_rps,