diff --git a/scale/base_compute.py b/scale/base_compute.py index a7dd321..62c524c 100644 --- a/scale/base_compute.py +++ b/scale/base_compute.py @@ -27,9 +27,8 @@ class BaseCompute(object): """ - def __init__(self, vm_name, nova_client, user_name): - self.novaclient = nova_client - self.user_name = user_name + def __init__(self, vm_name, network): + self.novaclient = network.router.user.nova_client self.vm_name = vm_name self.instance = None self.fip = None diff --git a/scale/base_network.py b/scale/base_network.py index a4c0258..e9c65cf 100644 --- a/scale/base_network.py +++ b/scale/base_network.py @@ -78,21 +78,20 @@ class BaseNetwork(object): - def __init__(self, neutron_client, nova_client, user_name, shared_interface_ip=None): + def __init__(self, router): """ Store the neutron client User name for this network and network object """ - self.neutron_client = neutron_client - self.nova_client = nova_client - self.user_name = user_name + self.neutron_client = router.user.neutron_client + self.nova_client = router.user.nova_client + self.router = router self.network = None self.instance_list = [] self.secgroup_list = [] self.keypair_list = [] # Store the shared interface ip of router for tested and testing cloud - self.shared_interface_ip = shared_interface_ip def create_compute_resources(self, network_prefix, config_scale): """ @@ -115,47 +114,28 @@ class BaseNetwork(object): keypair_name = network_prefix + "-K" + str(keypair_count) keypair_instance.add_public_key(keypair_name, config_scale['public_key_file']) - # Create the required number of VMs - # Create the VMs on specified network, first keypair, first secgroup + LOG.info("Scheduled to create virtual machines...") if config_scale['use_floatingip']: external_network = find_external_network(self.neutron_client) - LOG.info("Creating Virtual machines for user %s" % self.user_name) - if 'redis_server' in config_scale: - # Here we are creating a testing VM (client), put the redis server - # information in the user_data. - redis_server = config_scale['redis_server'] - redis_server_port = config_scale['redis_server_port'] - user_data = redis_server + ":" + str(redis_server_port) - else: - user_data = None + # Schedule to create the required number of VMs for instance_count in range(config_scale['vms_per_network']): vm_name = network_prefix + "-I" + str(instance_count) - perf_instance = PerfInstance(vm_name, self.nova_client, self.user_name, config_scale) + perf_instance = PerfInstance(vm_name, self, config_scale) self.instance_list.append(perf_instance) - nic_used = [{'net-id': self.network['id']}] - LOG.info("Creating Instance: " + vm_name) - perf_instance.create_server(config_scale['image_name'], - config_scale['flavor_type'], - self.keypair_list[0].keypair_name, - nic_used, - self.secgroup_list[0].secgroup, - user_data=user_data) - # Store the subnet info and fixed ip address in instance + perf_instance.subnet_ip = self.network['subnet_ip'] - perf_instance.fixed_ip = perf_instance.instance.networks.values()[0][0] - if self.shared_interface_ip: - perf_instance.shared_interface_ip = self.shared_interface_ip if config_scale['use_floatingip']: # Create the floating ip for the instance - # store it and the ip address in instance object + # store it and the ip address in perf_instance object perf_instance.fip = create_floating_ip(self.neutron_client, external_network) perf_instance.fip_ip = perf_instance.fip['floatingip']['floating_ip_address'] - # Associate the floating ip with this instance - perf_instance.instance.add_floating_ip(perf_instance.fip_ip) - perf_instance.ssh_ip = perf_instance.fip_ip - else: - # Store the fixed ip as ssh ip since there is no floating ip - perf_instance.ssh_ip = perf_instance.fixed_ip + + # Create the VMs on specified network, first keypair, first secgroup + perf_instance.boot_info['image_name'] = config_scale['image_name'] + perf_instance.boot_info['flavor_type'] = config_scale['flavor_type'] + perf_instance.boot_info['keyname'] = self.keypair_list[0].keypair_name + perf_instance.boot_info['nic'] = [{'net-id': self.network['id']}] + perf_instance.boot_info['sec_group'] = self.secgroup_list[0].secgroup def delete_compute_resources(self): """ @@ -234,15 +214,15 @@ class Router(object): of network interfaces to router """ - def __init__(self, neutron_client, nova_client, user_name, shared_network=None): - self.neutron_client = neutron_client - self.nova_client = nova_client + def __init__(self, user): + self.neutron_client = user.neutron_client + self.nova_client = user.nova_client self.router = None - self.user_name = user_name + self.user = user # Stores the list of networks self.network_list = [] # Store the shared network - self.shared_network = shared_network + self.shared_network = None self.shared_port_id = None # Store the interface ip of shared network attached to router self.shared_interface_ip = None @@ -253,16 +233,11 @@ class Router(object): Also triggers the creation of compute resources inside each network """ - # If a shared network exists create a port on this - # network and attach to router interface - if self.shared_network: - self.attach_router_interface(self.shared_network, use_port=True) for network_count in range(config_scale['networks_per_router']): - network_instance = BaseNetwork(self.neutron_client, self.nova_client, self.user_name, - self.shared_interface_ip) + network_instance = BaseNetwork(self) self.network_list.append(network_instance) # Create the network and subnet - network_name = self.user_name + "-N" + str(network_count) + network_name = self.user.user_name + "-N" + str(network_count) network_instance.create_network_and_subnet(network_name) # Attach the created network to router interface self.attach_router_interface(network_instance) @@ -289,7 +264,8 @@ class Router(object): for network in self.network_list: # Now delete the compute resources and the network resources network.delete_compute_resources() - self.remove_router_interface(network) + if network.network: + self.remove_router_interface(network) network.delete_network() # Also delete the shared port and remove it from router interface if self.shared_network: @@ -364,8 +340,6 @@ class Router(object): } self.neutron_client.add_interface_router(self.router['router']['id'], body) - - def remove_router_interface(self, network_instance, use_port=False): """ Remove the network interface from router diff --git a/scale/cfg.scale.yaml b/scale/cfg.scale.yaml index 3dbe55d..3762aa9 100644 --- a/scale/cfg.scale.yaml +++ b/scale/cfg.scale.yaml @@ -1,7 +1,7 @@ # KloudBuster Default configuration file server: # Number of tenants to be created on the cloud - number_tenants: 1 + number_tenants: 2 # Number of Users to be created inside the tenant users_per_tenant: 1 @@ -15,7 +15,7 @@ server: routers_per_user: 1 # Number of VM instances to be created within the context of each User - vms_per_network: 2 + vms_per_network: 1 # Number of security groups per network secgroups_per_network: 1 @@ -63,7 +63,7 @@ client: keypairs_per_network: 1 # Assign floating IP for every VM - use_floatingip: False + use_floatingip: False # Specify whether the testing cloud is running in same cloud run_on_same_cloud: True @@ -79,12 +79,26 @@ client: redis_retry_count: 50 polling_interval: 5 - # Duration of testing tools (seconds) - exec_time: 30 - # Tooling - tp_tool: 'nuttcp' - http_tool: 'wrk' + tp_tool: + name: 'nuttcp' + dest_path: '/var/tmp/nuttcp-7.3.2' + http_tool: + name: 'wrk' + dest_path: '/var/tmp/wrk-4.0.1' + + # HTTP Tool Specific Configs + http_tool_configs: + # Threads to run tests + threads: 2 + # Connections to be kept concurrently + connections: 5000 + # Timeout for HTTP requests + timeout: 5 + # Connection Type: "Keep-alive", "New" + connection_type: 'Keep-alive' + # Duration of testing tools (seconds) + duration: 30 # Prompt before running benchmarking tools prompt_before_run: False @@ -93,5 +107,5 @@ client: keystone_admin_role: "admin" cleanup_resources: True public_key_file: '../ssh/id_rsa.pub' - image_name: 'Scale Image v4' - flavor_type: 'm1.small' + image_name: 'Scale Image v4a' + flavor_type: 'kb_flavor' diff --git a/scale/kb_scheduler.py b/scale/kb_scheduler.py index eb38876..321c661 100644 --- a/scale/kb_scheduler.py +++ b/scale/kb_scheduler.py @@ -37,10 +37,37 @@ class KBScheduler(object): """ def __init__(self, client_list, config): - self.client_list = client_list + self.client_dict = dict(zip([x.vm_name.lower() for x in client_list], client_list)) self.config = config self.result = {} - self.redis_connection_pool = None + + # Redis + self.connection_pool = None + self.redis_obj = None + self.pubsub = None + self.orches_chan_name = "kloudbuster_orches" + self.report_chan_name = "kloudbuster_report" + + def setup_redis(self): + self.redis_obj = redis.StrictRedis(connection_pool=self.connection_pool) + # Check for connections to redis server + for retry in xrange(1, self.config.redis_retry_count + 1): + try: + self.redis_obj.get("test") + except (redis.exceptions.ConnectionError): + LOG.warn("Connecting to redis server... Retry #%d", retry) + time.sleep(1) + continue + break + # Subscribe to message channel + self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) + self.pubsub.subscribe(self.report_chan_name) + + def send_cmd(self, cmd, client_type, data): + message = {'cmd': cmd, 'sender-id': 'kb-master', + 'client-type': client_type, 'data': data} + LOG.kbdebug(message) + self.redis_obj.publish(self.orches_chan_name, message) def polling_vms(self, timeout, polling_interval=None): ''' @@ -51,24 +78,40 @@ class KBScheduler(object): polling_interval = self.config.polling_interval retry_count = max(timeout / polling_interval, 1) retry = cnt_succ = cnt_failed = 0 - clist = self.client_list + clist = self.client_dict.copy() while (retry < retry_count and len(clist)): time.sleep(polling_interval) - for instance in clist: - msg = instance.redis_get_message() + while True: + msg = self.pubsub.get_message() if not msg: - # No new message, command in executing - continue - elif msg[0]: - # Command returned with non-zero status, command failed - cnt_failed = cnt_failed + 1 - else: - # Command returned with zero, command succeed - cnt_succ = cnt_succ + 1 - # Current instance finished execution - self.result[instance.vm_name] = msg - clist = [x for x in clist if x != instance] + # No new message, commands are in executing + break + + LOG.kbdebug(msg) + payload = eval(msg['data']) + vm_name = payload['sender-id'] + instance = self.client_dict[vm_name] + if payload['cmd'] == 'READY': + # If a READY packet is received, the corresponding VM is up + # running. We mark the flag for that VM, and skip all READY + # messages received afterwards. + if instance.up_flag: + continue + else: + self.send_cmd('ACK', None, None) + clist[vm_name].up_flag = True + clist.pop(vm_name) + cnt_succ = cnt_succ + 1 + elif payload['cmd'] == 'DONE': + self.result[vm_name] = payload['data'] + clist.pop(vm_name) + if self.result[vm_name]['status']: + # Command returned with non-zero status, command failed + cnt_failed = cnt_failed + 1 + else: + # Command returned with zero, command succeed + cnt_succ = cnt_succ + 1 LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" % (cnt_succ, cnt_failed, len(clist), retry)) @@ -78,45 +121,46 @@ class KBScheduler(object): def wait_for_vm_up(self, timeout=120): cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_list): + if cnt_succ != len(self.client_dict): raise KBVMUpException() def setup_static_route(self, timeout=10): - for instance in self.client_list: - svr = instance.target_server - instance.add_static_route(svr.subnet_ip, svr.shared_interface_ip) + func = {'cmd': 'setup_static_route'} + self.send_cmd('EXEC', 'http', func) cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_list): + if cnt_succ != len(self.client_dict): raise KBSetStaticRouteException() - def check_http_server_up(self, timeout=60): - for instance in self.client_list: - instance.check_http_service() + def check_http_service(self, timeout=60): + func = {'cmd': 'check_http_service'} + self.send_cmd('EXEC', 'http', func) cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_list): + if cnt_succ != len(self.client_dict): raise KBHTTPServerUpException() def run_http_test(self): - for instance in self.client_list: - instance.run_http_client(threads=2, connections=5000, timeout=5, - connection_type="Keep-alive") + func = {'cmd': 'run_http_test'} + self.send_cmd('EXEC', 'http', func) # Give additional 30 seconds for everybody to report results - timeout = self.config.exec_time + 30 + timeout = self.config.http_tool_configs.duration + 3000 cnt_succ = self.polling_vms(timeout)[0] - if cnt_succ != len(self.client_list): + if cnt_succ != len(self.client_dict): raise KBHTTPBenchException() + # Parse the results from HTTP Tools + for key, instance in self.client_dict.items(): + self.result[key] = instance.http_client_parser(**self.result[key]) + def run(self): LOG.info("Setting up redis connection pool...") # For now, the redis server is not in the scope of Kloud Buster, which has to be # pre-configured before executing Kloud Buster. - self.redis_connection_pool = redis.ConnectionPool( + self.connection_pool = redis.ConnectionPool( host=self.config.redis_server, port=self.config.redis_server_port, db=0) try: LOG.info("Setting up the redis connections...") - for instance in self.client_list: - instance.setup_redis(connection_pool=self.redis_connection_pool) + self.setup_redis() LOG.info("Waiting for agents on VMs to come up...") self.wait_for_vm_up() @@ -125,7 +169,7 @@ class KBScheduler(object): self.setup_static_route() LOG.info("Waiting for HTTP service to come up...") - self.check_http_server_up() + self.check_http_service() if self.config.prompt_before_run: print "Press enter to start running benchmarking tools..." @@ -133,9 +177,9 @@ class KBScheduler(object): LOG.info("Starting HTTP Benchmarking...") self.run_http_test() - for key in self.result: + for val in self.result.values(): # TODO(Consolidating the data from all VMs) - print "[%s] %s" % (key, self.result[key][1]) + print val except (KBSetStaticRouteException): LOG.error("Could not set static route.") diff --git a/scale/kb_vm_agent.py b/scale/kb_vm_agent.py index fb988bc..7e5e5e5 100644 --- a/scale/kb_vm_agent.py +++ b/scale/kb_vm_agent.py @@ -21,19 +21,82 @@ import time import redis +class KB_Instance(object): + + # Check whether the HTTP Service is up running) + @staticmethod + def check_http_service(target_url): + cmd = 'while true; do\n' + cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (target_url) + cmd += 'if [ $? -eq 0 ]; then break; fi\n' + cmd += 'done' + return cmd + + # Add static route + @staticmethod + def add_static_route(network, next_hop_ip, if_name=None): + debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) + cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) + if if_name: + debug_msg += " and %s" % if_name + cmd += " dev %s" % if_name + # TODO(Logging on Agent) + print debug_msg + return cmd + + # Get static route + @staticmethod + def get_static_route(network, next_hop_ip=None, if_name=None): + cmd = "ip route show %s" % network + if next_hop_ip: + cmd += " via %s" % next_hop_ip + if if_name: + cmd += " dev %s" % if_name + return cmd + + # Delete static route + @staticmethod + def delete_static_route(network, next_hop_ip=None, if_name=None): + debug_msg = "Deleting static route %s" % network + cmd = "sudo ip route del %s" % network + if next_hop_ip: + debug_msg = " with next hop %s" % next_hop_ip + cmd += " via %s" % next_hop_ip + if if_name: + if next_hop_ip: + debug_msg = " and %s" % if_name + else: + debug_msg = "with next hop %s" % if_name + cmd += " dev %s" % if_name + # TODO(Logging on Agent) + print debug_msg + return cmd + + # Run the HTTP benchmarking tool + @staticmethod + def run_http_test(dest_path, target_url, threads, connections, + duration, timeout, connection_type): + cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \ + (dest_path, threads, connections, duration, timeout, target_url) + return cmd + + class KB_VM_Agent(object): - def __init__(self, host, port=6379): + def __init__(self, user_data): + host = user_data['redis_server'] + port = user_data['redis_server_port'] + self.user_data = user_data self.redis_obj = redis.StrictRedis(host=host, port=port) self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) self.hello_thread = None self.stop_hello = threading.Event() # Assumption: - # Here we assume the vm_name is the same as the host name, which is - # true if the VM is spawned by Kloud Buster. + # Here we assume the vm_name is the same as the host name (lower case), + # which is true if the VM is spawned by Kloud Buster. self.vm_name = socket.gethostname().lower() - self.orches_chan_name = self.vm_name.lower() + "_orches" - self.report_chan_name = self.vm_name.lower() + "_report" + self.orches_chan_name = "kloudbuster_orches" + self.report_chan_name = "kloudbuster_report" def setup_channels(self): # Check for connections to redis server @@ -48,6 +111,17 @@ class KB_VM_Agent(object): # Subscribe to orchestration channel self.pubsub.subscribe(self.orches_chan_name) + def report(self, cmd, client_type, data): + message = {'cmd': cmd, 'sender-id': self.vm_name, + 'client-type': client_type, 'data': data} + self.redis_obj.publish(self.report_chan_name, message) + + def send_hello(self): + # Sending "hello" message to master node every 2 seconds + while not self.stop_hello.is_set(): + self.report('READY', None, None) + time.sleep(2) + def exec_command(self, cmd): # Execute the command, and returns the outputs cmds = ['bash', '-c'] @@ -57,44 +131,56 @@ class KB_VM_Agent(object): return (p.returncode, stdout, stderr) - def report(self, result): - self.redis_obj.publish(self.report_chan_name, result) - - def process_cmd(self, cmd_data): - cmd_res_tuple = self.exec_command(cmd_data['cmd']) - cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple)) - cmd_res_dict['parser_cb'] = cmd_data['parser_cb'] - self.report(cmd_res_dict) - - def send_hello(self): - # Sending "hello" message to master node every 2 seconds - while not self.stop_hello.is_set(): - self.report("hello") - time.sleep(2) + def process_cmd(self, msg): + if msg['cmd'] == 'ACK': + # When 'ACK' is received, means the master node + # acknowledged the current VM. So stopped sending more + # "hello" packet to the master node. + # Unfortunately, there is no thread.stop() in Python 2.x + self.stop_hello.set() + elif msg['cmd'] == 'EXEC': + cmd_res_tuple = eval('self.exec_' + msg['data']['cmd'] + '()') + cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple)) + self.report('DONE', msg['client-type'], cmd_res_dict) + elif msg['cmd'] == 'ABORT': + # TODO(Add support to abort a session) + pass def work(self): for item in self.pubsub.listen(): if item['type'] != 'message': continue - if item['data'] == 'iamhere': - # When a "iamhere" packet is received, means the master node - # acknowledged the current VM. So stopped sending more - # "hello" packet to the master node. - # Unfortunately, there is no thread.stop() in Python 2.x - self.stop_hello.set() - continue # Convert the string representation of dict to real dict obj - cmd_data = eval(item['data']) - self.process_cmd(cmd_data) + msg = eval(item['data']) + self.process_cmd(msg) + + def exec_setup_static_route(self): + cmd = KB_Instance.add_static_route(self.user_data['target_subnet_ip'], + self.user_data['target_shared_interface_ip']) + return self.exec_command(cmd) + + def exec_check_http_service(self): + cmd = KB_Instance.check_http_service(self.user_data['target_url']) + return self.exec_command(cmd) + + def exec_run_http_test(self): + cmd = KB_Instance.run_http_test(dest_path=self.user_data['http_tool']['dest_path'], + target_url=self.user_data['target_url'], + **self.user_data['http_tool_configs']) + return self.exec_command(cmd) + if __name__ == "__main__": - if (len(sys.argv) <= 1): - print("ERROR: Expecting the redis server address.") + try: + f = open('/var/tmp/user-data', 'r') + user_data = eval(f.read()) + except Exception as e: + # TODO(Logging on Agent) + print e.message sys.exit(1) - redis_server, redis_server_port = sys.argv[1].split(':', 1) - agent = KB_VM_Agent(redis_server, redis_server_port) + agent = KB_VM_Agent(user_data) agent.setup_channels() agent.hello_thread = threading.Thread(target=agent.send_hello) agent.hello_thread.daemon = True diff --git a/scale/kloudbuster.py b/scale/kloudbuster.py index fbf042d..cb152c2 100644 --- a/scale/kloudbuster.py +++ b/scale/kloudbuster.py @@ -69,10 +69,8 @@ class Kloud(object): # if this cloud is sharing a network then all tenants must hook up to # it and on deletion that shared network must NOT be deleted # as it will be deleted by the owner - self.shared_network = None - def create_resources(self, shared_net=None): - self.shared_network = shared_net + def create_resources(self): for tenant_count in xrange(self.scale_cfg['number_tenants']): tenant_name = self.prefix + "-T" + str(tenant_count) new_tenant = tenant.Tenant(tenant_name, self) @@ -94,6 +92,34 @@ class Kloud(object): all_instances.extend(tnt.get_all_instances()) return all_instances + def attach_to_shared_net(self, shared_net): + # If a shared network exists create a port on this + # network and attach to router interface + for tnt in self.tenant_list: + for usr in tnt.user_list: + for rtr in usr.router_list: + rtr.shared_network = shared_net + rtr.attach_router_interface(shared_net, use_port=True) + for net in rtr.network_list: + for ins in net.instance_list: + ins.shared_interface_ip = rtr.shared_interface_ip + + def create_vms(self): + # TODO(Make the creation concurrently) + for instance in self.get_all_instances(): + LOG.info("Creating Instance: " + instance.vm_name) + instance.create_server(**instance.boot_info) + + instance.fixed_ip = instance.instance.networks.values()[0][0] + if instance.config['use_floatingip']: + # Associate the floating ip with this instance + instance.instance.add_floating_ip(instance.fip_ip) + instance.ssh_ip = instance.fip_ip + else: + # Store the fixed ip as ssh ip since there is no floating ip + instance.ssh_ip = instance.fixed_ip + + class KloudBuster(object): """ Creates resources on the cloud for loading up the cloud @@ -139,6 +165,26 @@ class KloudBuster(object): LOG.info('Provision Details (Testing Kloud)\n' + tabulate(table, headers="firstrow", tablefmt="psql")) + def gen_user_data(self): + LOG.info("Preparing metadata for testing VMs...") + # We supposed to have a mapping framework/algorithm to mapping clients to servers. + # e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc. + # Here we are using N*1:1 + client_list = self.testing_kloud.get_all_instances() + svr_list = self.kloud.get_all_instances() + + for idx, ins in enumerate(client_list): + ins.target_url = "http://%s/index.html" %\ + (svr_list[idx].fip_ip or svr_list[idx].fixed_ip) + ins.user_data['redis_server'] = ins.config['redis_server'] + ins.user_data['redis_server_port'] = ins.config['redis_server_port'] + ins.user_data['target_subnet_ip'] = svr_list[idx].subnet_ip + ins.user_data['target_shared_interface_ip'] = svr_list[idx].shared_interface_ip + ins.user_data['target_url'] = ins.target_url + ins.user_data['http_tool'] = ins.config['http_tool'] + ins.user_data['http_tool_configs'] = ins.config['http_tool_configs'] + ins.boot_info['user_data'] = str(ins.user_data) + def run(self): """ The runner for KloudBuster Tests @@ -147,27 +193,21 @@ class KloudBuster(object): """ try: - # Create the testing cloud resources + self.kloud.create_resources() + self.kloud.create_vms() self.testing_kloud.create_resources() - # Find the shared network if the cloud used to testing is same if self.single_cloud: - shared_network = self.testing_kloud.get_first_network() - else: - shared_network = None - self.kloud.create_resources(shared_network) + # Find the shared network if the cloud used to testing is same + # Attach the router in tested kloud to the shared network + shared_net = self.testing_kloud.get_first_network() + self.kloud.attach_to_shared_net(shared_net) + self.gen_user_data() + self.testing_kloud.create_vms() # Function that print all the provisioning info self.print_provision_info() - # We supposed to have a mapping framework/algorithm to mapping clients to servers. - # e.g. 1:1 mapping, 1:n mapping, n:1 mapping, etc. - # Here we are using N*1:1 client_list = self.testing_kloud.get_all_instances() - for idx, svr in enumerate(self.kloud.get_all_instances()): - client_list[idx].target_server = svr - client_list[idx].target_url = "http://%s/index.html" %\ - (svr.fip_ip or svr.fixed_ip) - kbscheduler = kb_scheduler.KBScheduler(client_list, config_scale.client) kbscheduler.run() except KeyboardInterrupt: @@ -178,9 +218,16 @@ class KloudBuster(object): # Cleanup: start with tested side first # then testing side last (order is important because of the shared network) if config_scale.server['cleanup_resources']: - self.kloud.delete_resources() + try: + self.kloud.delete_resources() + except Exception: + traceback.print_exc() if config_scale.client['cleanup_resources']: - self.testing_kloud.delete_resources() + try: + self.testing_kloud.delete_resources() + except Exception: + traceback.print_exc() + if __name__ == '__main__': # The default configuration file for KloudBuster diff --git a/scale/perf_instance.py b/scale/perf_instance.py index 9a8d12e..18d8fbc 100644 --- a/scale/perf_instance.py +++ b/scale/perf_instance.py @@ -16,13 +16,11 @@ import os import stat import subprocess -import time import sshutils from base_compute import BaseCompute import log as logging -import redis from wrk_tool import WrkTool LOG = logging.getLogger(__name__) @@ -30,12 +28,14 @@ LOG = logging.getLogger(__name__) # An openstack instance (can be a VM or a LXC) class PerfInstance(BaseCompute): - def __init__(self, vm_name, nova_client, user_name, config, is_server=False): - BaseCompute.__init__(self, vm_name, nova_client, user_name) + def __init__(self, vm_name, network, config, is_server=False): + BaseCompute.__init__(self, vm_name, network) self.config = config - self.internal_ip = None self.is_server = is_server + self.boot_info = {} + self.user_data = {} + self.up_flag = False # SSH Configuration self.ssh_ip = None @@ -43,13 +43,6 @@ class PerfInstance(BaseCompute): self.ssh = None self.port = None - # Redis Configuration - self.redis_obj = None - self.pubsub = None - self.up_flag = False - self.orches_chan_name = self.vm_name.lower() + "_orches" - self.report_chan_name = self.vm_name.lower() + "_report" - if 'tp_tool' not in config: self.tp_tool = None # elif config.tp_tool.lower() == 'nuttcp': @@ -61,9 +54,8 @@ class PerfInstance(BaseCompute): if 'http_tool' not in config: self.http_tool = None - elif config.http_tool.lower() == 'wrk': - self.http_tool = WrkTool(self) - self.target_server = None + elif config.http_tool.name.lower() == 'wrk': + self.http_tool = WrkTool(self, config.http_tool) self.target_url = None else: self.http_tool = None @@ -87,8 +79,7 @@ class PerfInstance(BaseCompute): tp_tool_res = [] res = {'ip_to': dest_ip} - if self.internal_ip: - res['ip_from'] = self.internal_ip + res['ip_from'] = self.ssh_ip if label: res['desc'] = label if self.az: @@ -102,23 +93,11 @@ class PerfInstance(BaseCompute): res['results'] = tp_tool_res return res - # Target URL is supposed to be provided during the mapping stage - def run_http_client(self, threads, connections, - timeout=5, connection_type="Keep-alive"): - # HTTP Performance Measurement - cmd = self.http_tool.cmd_run_client(self.target_url, - threads, - connections, - timeout, - connection_type) - parser_cb = 'self.run_http_client_parser' - self.redis_exec_command(cmd, parser_cb) - - def run_http_client_parser(self, status, stdout, stderr): + def http_client_parser(self, status, stdout, stderr): http_tool_res = self.http_tool.cmd_parser_run_client(status, stdout, stderr) - res = {'target_url': self.target_url} - if self.internal_ip: - res['ip_from'] = self.internal_ip + res = {'vm_name': self.vm_name} + res['target_url'] = self.target_url + res['ip_from'] = self.ssh_ip # consolidate results for all tools res['results'] = http_tool_res @@ -128,8 +107,6 @@ class PerfInstance(BaseCompute): # Returns True if success def setup_ssh(self, ssh_ip, ssh_user): # used for displaying the source IP in json results - if not self.internal_ip: - self.internal_ip = ssh_ip self.ssh_ip = ssh_ip self.ssh_user = ssh_user self.ssh = sshutils.SSH(self.ssh_user, self.ssh_ip, @@ -142,108 +119,6 @@ class PerfInstance(BaseCompute): (status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout) return (status, cmd_output, err) - # Setup the redis connectivity - def setup_redis(self, host=None, port=None, connection_pool=None): - if connection_pool: - self.redis_obj = redis.StrictRedis(connection_pool=connection_pool) - else: - self.redis_obj = redis.StrictRedis(host=host, port=port) - - # Check for connections to redis server - for retry in xrange(1, self.config.redis_retry_count + 1): - try: - self.redis_obj.get("test") - except (redis.exceptions.ConnectionError): - LOG.warn("Connecting to redis server... Retry #%d", retry) - time.sleep(1) - continue - break - # Subscribe to message channel - self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) - self.pubsub.subscribe(self.report_chan_name) - - return True - - def redis_get_message(self): - message = self.pubsub.get_message() - while message and message['data'] == 'hello': - # If a "hello" packet is received, the corresponding VM is up - # running. We mark the flag for that VM, and skip all "hello" - # messages received afterwards. - if self.up_flag: - message = self.pubsub.get_message() - else: - self.up_flag = True - self.redis_acknowledge_hello() - return (0, "", "") - if not message: - return None - - LOG.kbdebug(message) - msg_body = eval(message['data']) - status = int(msg_body['status']) - stdout = msg_body['stdout'] - stderr = msg_body['stderr'] - parser_cb = msg_body['parser_cb'] - - if parser_cb is not None: - stdout = eval("%s(status, stdout, stderr)" % parser_cb) - - return (status, stdout, stderr) - - def redis_acknowledge_hello(self): - self.redis_obj.publish(self.orches_chan_name, "iamhere") - - def redis_exec_command(self, cmd, parser_cb=None, timeout=30): - # TODO(Add timeout support) - msg_body = {'cmd': cmd, 'parser_cb': parser_cb} - LOG.kbdebug(msg_body) - self.redis_obj.publish(self.orches_chan_name, msg_body) - - # Check whether the HTTP Service is up running - def check_http_service(self): - cmd = 'while true; do\n' - cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (self.target_url) - cmd += 'if [ $? -eq 0 ]; then break; fi\n' - cmd += 'done' - self.redis_exec_command(cmd, None) - - # Add static route - def add_static_route(self, network, next_hop_ip, if_name=None): - debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip) - cmd = "sudo ip route add %s via %s" % (network, next_hop_ip) - if if_name: - debug_msg += " and %s" % if_name - cmd += " dev %s" % if_name - LOG.kbdebug(debug_msg) - self.redis_exec_command(cmd, None) - - # Get static route - def get_static_route(self, network, next_hop_ip=None, if_name=None): - cmd = "ip route show %s" % network - if next_hop_ip: - cmd += " via %s" % next_hop_ip - if if_name: - cmd += " dev %s" % if_name - # TODO(Need to implement a parser_cb instead of passing None) - self.redis_exec_command(cmd, None) - - # Delete static route - def delete_static_route(self, network, next_hop_ip=None, if_name=None): - debug_msg = "[%s] Deleting static route %s" % (self.vm_name, network) - cmd = "sudo ip route del %s" % network - if next_hop_ip: - debug_msg = " with next hop %s" % next_hop_ip - cmd += " via %s" % next_hop_ip - if if_name: - if next_hop_ip: - debug_msg = " and %s" % if_name - else: - debug_msg = "with next hop %s" % if_name - cmd += " dev %s" % if_name - LOG.kbdebug(debug_msg) - self.redis_exec_command(cmd, None) - # scp a file from the local host to the instance # Returns True if dest file already exists or scp succeeded # False in case of scp error diff --git a/scale/perf_tool.py b/scale/perf_tool.py index ae4c3ab..3f237e1 100644 --- a/scale/perf_tool.py +++ b/scale/perf_tool.py @@ -20,17 +20,14 @@ import log as logging LOG = logging.getLogger(__name__) -# where to copy the tool on the target, must end with slash -SCP_DEST_DIR = '/var/tmp/' - # A base class for all tools that can be associated to an instance class PerfTool(object): __metaclass__ = abc.ABCMeta - def __init__(self, name, instance): - self.name = name + def __init__(self, instance, tool_cfg): + self.name = tool_cfg.name self.instance = instance - self.dest_path = SCP_DEST_DIR + name + self.dest_path = tool_cfg.dest_path self.pid = None # Terminate pid if started diff --git a/scale/tenant.py b/scale/tenant.py index 718d0d4..115e89a 100644 --- a/scale/tenant.py +++ b/scale/tenant.py @@ -56,7 +56,8 @@ class Tenant(object): # Conflict: Conflict occurred attempting to store project - Duplicate Entry (HTTP 409) if exc.http_status != 409: raise exc - LOG.info("Tenant %s already present, reusing it" % self.tenant_name) + LOG.info("Tenant %s already present, reusing it" % self.tenant_name) + # It is a hassle to find a tenant by name as the only way seems to retrieve # the list of all tenants which can be very large tenant_list = self.kloud.keystone.tenants.list() diff --git a/scale/users.py b/scale/users.py index 0ce0a41..96fc813 100644 --- a/scale/users.py +++ b/scale/users.py @@ -40,9 +40,9 @@ class User(object): self.user_id = None self.router_list = [] # Store the neutron and nova client - self.neutron = None - self.nova = None - admin_user = self._get_user() + self.neutron_client = None + self.nova_client = None + self.admin_user = self._get_user() # Create the user within the given tenant associate # admin role with user. We need admin role for user @@ -53,10 +53,10 @@ class User(object): if role.name == user_role: current_role = role break - self.tenant.kloud.keystone.roles.add_user_role(admin_user, + self.tenant.kloud.keystone.roles.add_user_role(self.admin_user, current_role, tenant.tenant_id) - self.user_id = admin_user.id + self.user_id = self.admin_user.id def _create_user(self): LOG.info("Creating user: " + self.user_name) @@ -78,19 +78,19 @@ class User(object): # Conflict: Conflict occurred attempting to store user - Duplicate Entry (HTTP 409) if exc.http_status != 409: raise exc - # Try to repair keystone by removing that user - LOG.warn("User creation failed due to stale user with same name: " + - self.user_name) - # Again, trying to find a user by name is pretty inefficient as one has to list all - # of them - users_list = self.tenant.kloud.keystone.users.list() - for user in users_list: - if user.name == self.user_name: - # Found it, time to delete it - LOG.info("Deleting stale user with name: " + self.user_name) - self.tenant.kloud.keystone.users.delete(user) - user = self._create_user() - return user + # Try to repair keystone by removing that user + LOG.warn("User creation failed due to stale user with same name: " + + self.user_name) + # Again, trying to find a user by name is pretty inefficient as one has to list all + # of them + users_list = self.tenant.kloud.keystone.users.list() + for user in users_list: + if user.name == self.user_name: + # Found it, time to delete it + LOG.info("Deleting stale user with name: " + self.user_name) + self.tenant.kloud.keystone.users.delete(user) + user = self._create_user() + return user # Not found there is something wrong raise Exception('Cannot find stale user:' + self.user_name) @@ -119,7 +119,7 @@ class User(object): creden['tenant_name'] = self.tenant.tenant_name # Create the neutron client to be used for all operations - self.neutron = neutronclient.Client(**creden) + self.neutron_client = neutronclient.Client(**creden) # Create a new nova client for this User with correct credentials creden_nova = {} @@ -128,21 +128,20 @@ class User(object): creden_nova['auth_url'] = self.tenant.kloud.auth_url creden_nova['project_id'] = self.tenant.tenant_name creden_nova['version'] = 2 - self.nova = Client(**creden_nova) + self.nova_client = Client(**creden_nova) config_scale = self.tenant.kloud.scale_cfg # Find the external network that routers need to attach to # if redis_server is configured, we need to attach the router to the # external network in order to reach the redis_server if config_scale['use_floatingip'] or 'redis_server' in config_scale: - external_network = base_network.find_external_network(self.neutron) + external_network = base_network.find_external_network(self.neutron_client) else: external_network = None # Create the required number of routers and append them to router list - LOG.info("Creating routers for user %s" % self.user_name) + LOG.info("Creating routers and networks for user %s" % self.user_name) for router_count in range(config_scale['routers_per_user']): - router_instance = base_network.Router(self.neutron, self.nova, self.user_name, - self.tenant.kloud.shared_network) + router_instance = base_network.Router(self) self.router_list.append(router_instance) router_name = self.user_name + "-R" + str(router_count) # Create the router and also attach it to external network diff --git a/scale/wrk_tool.py b/scale/wrk_tool.py index 931579b..6353961 100644 --- a/scale/wrk_tool.py +++ b/scale/wrk_tool.py @@ -24,15 +24,15 @@ LOG = logging.getLogger(__name__) class WrkTool(PerfTool): - def __init__(self, instance): - PerfTool.__init__(self, 'wrk-4.0.1', instance) + def __init__(self, instance, cfg_http_tool): + PerfTool.__init__(self, instance, cfg_http_tool) def cmd_run_client(self, target_url, threads, connections, timeout=5, connetion_type='Keep-alive', retry_count=10): ''' Return the command for running the benchmarking tool ''' - duration_sec = self.instance.config.exec_time + duration_sec = self.instance.config.http_tool_configs.duration cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \ (self.dest_path, threads, connections, duration_sec, timeout, target_url) LOG.kbdebug("[%s] %s" % (self.instance.vm_name, cmd)) @@ -41,6 +41,7 @@ class WrkTool(PerfTool): def cmd_parser_run_client(self, status, stdout, stderr): if status: return [self.parse_error(stderr)] + # Sample Output: # Running 10s test @ http://192.168.1.1/index.html # 8 threads and 5000 connections