Use redis as the communication protocol

1. Supported to use redis as the orchestration and reporting protocol;
2. Implemented the agent to run on every testing VM;
3. Modified the HTTP tools to use redis messaging APIs;
4. Added APIs to display provision details for klouds;
5. Added the option to allow a prompt before running tools;
6. Cleanup unneeded code;

Change-Id: I221ebbeb8a3001374699617dbd827de269419dab
This commit is contained in:
Yichen Wang 2015-04-16 15:19:51 -07:00
parent 3a8ad162e3
commit 8a9dda8f7d
14 changed files with 448 additions and 345 deletions

View File

@ -10,6 +10,7 @@ ecdsa>=0.11
jsonpatch>=1.9
jsonschema>=2.4.0
lxml>=3.4.0
oslo.log>=1.0.0
oslo.utils>=1.2.0
paramiko>=1.14.0
pycrypto>=2.6.1
@ -19,5 +20,6 @@ python-neutronclient<3,>=2.3.6
python-novaclient>=2.18.1
python-openstackclient>=0.4.1
python-keystoneclient>=1.0.0
redis>=2.10.3
scp>=0.8.0
tabulate>=0.7.3

View File

@ -27,10 +27,10 @@ class BaseCompute(object):
"""
def __init__(self, nova_client, user_name):
def __init__(self, vm_name, nova_client, user_name):
self.novaclient = nova_client
self.user_name = user_name
self.vm_name = None
self.vm_name = vm_name
self.instance = None
self.fip = None
self.fip_ip = None
@ -43,11 +43,9 @@ class BaseCompute(object):
# Create a server instance with associated
# security group, keypair with a provided public key
def create_server(self, vmname, image_name, flavor_type, keyname,
nic, sec_group, public_key_file,
avail_zone=None, user_data=None,
config_drive=None,
retry_count=100):
def create_server(self, image_name, flavor_type, keyname,
nic, sec_group, avail_zone=None, user_data=None,
config_drive=None, retry_count=100):
"""
Create a VM instance given following parameters
1. VM Name
@ -59,12 +57,11 @@ class BaseCompute(object):
"""
# Get the image id and flavor id from their logical names
self.vm_name = vmname
image = self.find_image(image_name)
flavor_type = self.find_flavor(flavor_type)
# Also attach the created security group for the test
instance = self.novaclient.servers.create(name=vmname,
instance = self.novaclient.servers.create(name=self.vm_name,
image=image,
flavor=flavor_type,
key_name=keyname,
@ -73,7 +70,7 @@ class BaseCompute(object):
userdata=user_data,
config_drive=config_drive,
security_groups=[sec_group.id])
flag_exist = self.find_server(vmname, retry_count)
flag_exist = self.find_server(self.vm_name, retry_count)
if flag_exist:
self.instance = instance
@ -165,10 +162,9 @@ class SecGroup(object):
self.novaclient.security_groups.delete(self.secgroup)
break
except Exception:
LOG.warn("Security group %s in use retry count: %d" % (
self.secgroup_name,
retry_count))
time.sleep(4)
LOG.warn("Security group %s in use. Retry #%d" % (
self.secgroup_name, retry_count))
time.sleep(2)
class KeyPair(object):

View File

@ -105,14 +105,14 @@ class BaseNetwork(object):
for secgroup_count in range(config_scale['secgroups_per_network']):
secgroup_instance = base_compute.SecGroup(self.nova_client)
self.secgroup_list.append(secgroup_instance)
secgroup_name = network_prefix + "_SG" + str(secgroup_count)
secgroup_name = network_prefix + "-SG" + str(secgroup_count)
secgroup_instance.create_secgroup_with_rules(secgroup_name)
# Create the keypair list
for keypair_count in range(config_scale['keypairs_per_network']):
keypair_instance = base_compute.KeyPair(self.nova_client)
self.keypair_list.append(keypair_instance)
keypair_name = network_prefix + "_K" + str(keypair_count)
keypair_name = network_prefix + "-K" + str(keypair_count)
keypair_instance.add_public_key(keypair_name, config_scale['public_key_file'])
# Create the required number of VMs
@ -120,30 +120,34 @@ class BaseNetwork(object):
if config_scale['use_floatingip']:
external_network = find_external_network(self.neutron_client)
LOG.info("Creating Virtual machines for user %s" % self.user_name)
if 'redis_server' in config_scale:
# Here we are creating a testing VM (client), put the redis server
# information in the user_data.
redis_server = config_scale['redis_server']
redis_server_port = config_scale['redis_server_port']
user_data = redis_server + ":" + str(redis_server_port)
else:
user_data = None
for instance_count in range(config_scale['vms_per_network']):
perf_instance = PerfInstance(self.nova_client, self.user_name)
vm_name = network_prefix + "-I" + str(instance_count)
perf_instance = PerfInstance(vm_name, self.nova_client, self.user_name, config_scale)
self.instance_list.append(perf_instance)
vm_name = network_prefix + "_I" + str(instance_count)
nic_used = [{'net-id': self.network['id']}]
LOG.info("Creating Instance: " + vm_name)
perf_instance.create_server(vm_name, config_scale['image_name'],
perf_instance.create_server(config_scale['image_name'],
config_scale['flavor_type'],
self.keypair_list[0].keypair_name,
nic_used,
self.secgroup_list[0].secgroup,
config_scale['public_key_file'],
None,
None,
None)
user_data=user_data)
# Store the subnet info and fixed ip address in instance
perf_instance.subnet_ip = self.network['subnet_ip']
LOG.info(perf_instance.instance.networks.values())
LOG.info("++++++++++++++++++++++++++++++")
perf_instance.fixed_ip = perf_instance.instance.networks.values()[0][0]
if self.shared_interface_ip:
perf_instance.shared_interface_ip = self.shared_interface_ip
# Create the floating ip for the instance store it and the ip address in instance object
if config_scale['use_floatingip']:
# Create the floating ip for the instance
# store it and the ip address in instance object
perf_instance.fip = create_floating_ip(self.neutron_client, external_network)
perf_instance.fip_ip = perf_instance.fip['floatingip']['floating_ip_address']
# Associate the floating ip with this instance
@ -152,11 +156,6 @@ class BaseNetwork(object):
else:
# Store the fixed ip as ssh ip since there is no floating ip
perf_instance.ssh_ip = perf_instance.fixed_ip
LOG.info("VM Information")
LOG.info("SSH IP:%s" % perf_instance.ssh_ip)
LOG.info("Subnet Info: %s" % perf_instance.subnet_ip)
if self.shared_interface_ip:
LOG.info("Shared router interface ip %s" % self.shared_interface_ip)
def delete_compute_resources(self):
"""
@ -182,7 +181,7 @@ class BaseNetwork(object):
"""
Create a network with 1 subnet inside it
"""
subnet_name = "kloudbuster_subnet" + network_name
subnet_name = "kloudbuster_subnet_" + network_name
body = {
'network': {
'name': network_name,
@ -263,7 +262,7 @@ class Router(object):
self.shared_interface_ip)
self.network_list.append(network_instance)
# Create the network and subnet
network_name = self.user_name + "_N" + str(network_count)
network_name = self.user_name + "-N" + str(network_count)
network_instance.create_network_and_subnet(network_name)
# Attach the created network to router interface
self.attach_router_interface(network_instance)

View File

@ -1,6 +0,0 @@
# KloudBuster Default configuration file
server:
flavor_type: 'CO2-Large'
client:
flavor_type: 'CO2-Large'

View File

@ -1,7 +1,7 @@
# KloudBuster Default configuration file
server:
server:
# Number of tenants to be created on the cloud
number_tenants: 2
number_tenants: 1
# Number of Users to be created inside the tenant
users_per_tenant: 1
@ -15,23 +15,29 @@ server:
routers_per_user: 1
# Number of VM instances to be created within the context of each User
vms_per_network: 1
vms_per_network: 2
# Number of security groups per network
secgroups_per_network: 1
# Number of keypairs per network
keypairs_per_network: 1
# Assign floating IP for every VM
use_floatingip: False
# SSH configuration
ssh_vm_username: 'ubuntu'
ssh_retry_count: 50
private_key_file: './ssh/id_rsa'
# Configs that remain constant
keystone_admin_role: "admin"
cleanup_resources : True
public_key_file : '../ssh/id_rsa.pub'
image_name : 'Scale Image v3'
cleanup_resources: True
public_key_file: '../ssh/id_rsa.pub'
image_name: 'Scale Image v4'
flavor_type: 'm1.small'
client:
# Number of tenants to be created on the cloud
number_tenants: 1
@ -48,7 +54,7 @@ client:
routers_per_user: 1
# Number of VM instances to be created within the context of each User
vms_per_network: 2
vms_per_network: 2
# Number of security groups per network
secgroups_per_network: 1
@ -56,14 +62,36 @@ client:
# Number of keypairs per network
keypairs_per_network: 1
use_floatingip: True
# Assign floating IP for every VM
use_floatingip: False
# Specify whether the testing cloud is running in same cloud
run_on_same_cloud: True
# SSH configuration
ssh_vm_username: 'ubuntu'
ssh_retry_count: 50
private_key_file: './ssh/id_rsa'
# Redis server configuration
redis_server: '172.29.172.180'
redis_server_port: 6379
redis_retry_count: 50
polling_interval: 5
# Duration of testing tools (seconds)
exec_time: 30
# Tooling
tp_tool: 'nuttcp'
http_tool: 'wrk'
# Prompt before running benchmarking tools
prompt_before_run: False
# Configs that remain constant
keystone_admin_role: "admin"
cleanup_resources : True
public_key_file : '../ssh/id_rsa.pub'
image_name : 'Scale Image v3'
cleanup_resources: True
public_key_file: '../ssh/id_rsa.pub'
image_name: 'Scale Image v4'
flavor_type: 'm1.small'

View File

@ -12,128 +12,137 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import traceback
import time
import log as logging
import sshutils
import redis
LOG = logging.getLogger(__name__)
class KBVMUpException(Exception):
pass
class KBSetStaticRouteException(Exception):
pass
class KBHTTPServerUpException(Exception):
pass
class KBHTTPBenchException(Exception):
pass
class KBScheduler(object):
"""
Control the slave nodes on the testing cloud
Control the testing VMs on the testing cloud
"""
"""
The code below are mostly a temporary solution, which assumes all testing
clients have their own floating IP. However, this is usually not ture for
a real use case.
def __init__(self, client_list, config):
self.client_list = client_list
self.config = config
self.result = {}
self.redis_connection_pool = None
Will replace below code. and take advantage of kafka framework
"""
def __init__(self, kb_master=None):
self.kb_master = kb_master
self.client_status = {}
self.client_result = {}
def check_server_httpd(self, instance, retry_count=60):
def polling_vms(self, timeout, polling_interval=None):
'''
Check the target server is up running
Polling all VMs for the status of execution
Guarantee to run once if the timeout is less than polling_interval
'''
LOG.info("[%s] Waiting for HTTP Server to come up..." %
instance.vm_name)
cmd = 'curl --head %s --connect-timeout 2' % (instance.target_url)
for retry in range(1, retry_count + 1):
try:
(status, _, _) = instance.exec_command(cmd)
except Exception as e:
traceback.print_exc()
self.client_status[instance.vm_name] = "ERROR: %s" % e.message
return
if not status:
return
LOG.debug("[%s] Waiting for HTTP Server to come up... Retry %d#" %
(instance.vm_name, retry))
if not polling_interval:
polling_interval = self.config.polling_interval
retry_count = max(timeout / polling_interval, 1)
retry = cnt_succ = cnt_failed = 0
clist = self.client_list
def setup_static_route(self, instance):
svr = instance.target_server
if not svr.fip_ip:
if svr.subnet_ip not in instance.get_static_route(svr.subnet_ip):
rc = instance.add_static_route(svr.subnet_ip,
svr.shared_interface_ip)
if rc > 0:
LOG.error("Failed to add static route, error code: %i." %
rc)
raise KBSetStaticRouteException()
while (retry < retry_count and len(clist)):
time.sleep(polling_interval)
for instance in clist:
msg = instance.redis_get_message()
if not msg:
# No new message, command in executing
continue
elif msg[0]:
# Command returned with non-zero status, command failed
cnt_failed = cnt_failed + 1
else:
# Command returned with zero, command succeed
cnt_succ = cnt_succ + 1
# Current instance finished execution
self.result[instance.vm_name] = msg
clist = [x for x in clist if x != instance]
LOG.info("%d Succeed, %d Failed, %d Pending... Retry #%d" %
(cnt_succ, cnt_failed, len(clist), retry))
retry = retry + 1
return (cnt_succ, cnt_failed, len(clist))
def wait_for_vm_up(self, timeout=120):
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
raise KBVMUpException()
def setup_static_route(self, timeout=10):
for instance in self.client_list:
svr = instance.target_server
instance.add_static_route(svr.subnet_ip, svr.shared_interface_ip)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
raise KBSetStaticRouteException()
def check_http_server_up(self, timeout=60):
for instance in self.client_list:
instance.check_http_service()
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
raise KBHTTPServerUpException()
def run_http_test(self):
for instance in self.client_list:
instance.run_http_client(threads=2, connections=5000, timeout=5,
connection_type="Keep-alive")
# Give additional 30 seconds for everybody to report results
timeout = self.config.exec_time + 30
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_list):
raise KBHTTPBenchException()
def run(self):
LOG.info("Setting up redis connection pool...")
# For now, the redis server is not in the scope of Kloud Buster, which has to be
# pre-configured before executing Kloud Buster.
self.redis_connection_pool = redis.ConnectionPool(
host=self.config.redis_server, port=self.config.redis_server_port, db=0)
def setup_testing_env(self, instance):
try:
instance.setup_ssh(instance.fip_ip or instance.fixed_ip, "ubuntu")
self.setup_static_route(instance)
self.check_server_httpd(instance)
except (sshutils.SSHError):
self.client_status[instance.vm_name] = "ERROR: Could not setup SSH Session."
return
LOG.info("Setting up the redis connections...")
for instance in self.client_list:
instance.setup_redis(connection_pool=self.redis_connection_pool)
LOG.info("Waiting for agents on VMs to come up...")
self.wait_for_vm_up()
LOG.info("Setting up static route to reach tested cloud...")
self.setup_static_route()
LOG.info("Waiting for HTTP service to come up...")
self.check_http_server_up()
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Starting HTTP Benchmarking...")
self.run_http_test()
for key in self.result:
# TODO(Consolidating the data from all VMs)
print "[%s] %s" % (key, self.result[key][1])
except (KBSetStaticRouteException):
self.client_status[instance.vm_name] = "ERROR: Could not set static route."
LOG.error("Could not set static route.")
return
except (KBHTTPServerUpException):
LOG.error("HTTP service is not up in testing cloud.")
return
except KBHTTPBenchException():
LOG.error("Error in HTTP benchmarking.")
return
def run_test(self, instance):
try:
self.client_result[instance.vm_name] =\
instance.run_http_client(threads=2, connections=5000,
timeout=5, connection_type="Keep-alive")
except Exception as e:
traceback.print_exc()
self.client_status[instance.vm_name] = "ERROR: %s" % e.message
def run(self, client_list):
# Wait for kb_master and all clients to come up
# if not self.check_up_with_sshd(self.kb_master):
# raise
thread_list = []
error_flag = False
# Pre-allocate the dictionary
for cur_client in client_list:
self.client_status[cur_client.vm_name] = None
LOG.info("Setting up the testing environments...")
for cur_client in client_list:
self.client_status[cur_client.vm_name] = "Success"
t = threading.Thread(target=self.setup_testing_env, args=[cur_client])
thread_list.append(t)
t.start()
for cur_thread in thread_list:
cur_thread.join()
for cur_client in client_list:
vm_name = cur_client.vm_name
if self.client_status[vm_name] != "Success":
error_flag = True
LOG.info("%s: %s" % (vm_name, self.client_status[vm_name]))
if error_flag:
raise
LOG.info("TEST STARTED")
thread_list = []
for cur_client in client_list:
self.client_status[cur_client.vm_name] = "Success"
t = threading.Thread(target=self.run_test, args=[cur_client])
thread_list.append(t)
t.start()
for cur_thread in thread_list:
cur_thread.join()
for cur_client in client_list:
vm_name = cur_client.vm_name
if self.client_status[vm_name] == "Success":
LOG.info("%s: %s" % (vm_name, self.client_result[vm_name]))
else:
LOG.error("%s: %s" % (vm_name, self.client_status[vm_name]))

102
scale/kb_vm_agent.py Normal file
View File

@ -0,0 +1,102 @@
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import socket
import subprocess
import sys
import threading
import time
import redis
class KB_VM_Agent(object):
def __init__(self, host, port=6379):
self.redis_obj = redis.StrictRedis(host=host, port=port)
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.hello_thread = None
self.stop_hello = threading.Event()
# Assumption:
# Here we assume the vm_name is the same as the host name, which is
# true if the VM is spawned by Kloud Buster.
self.vm_name = socket.gethostname().lower()
self.orches_chan_name = self.vm_name.lower() + "_orches"
self.report_chan_name = self.vm_name.lower() + "_report"
def setup_channels(self):
# Check for connections to redis server
while (True):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
time.sleep(1)
continue
break
# Subscribe to orchestration channel
self.pubsub.subscribe(self.orches_chan_name)
def exec_command(self, cmd):
# Execute the command, and returns the outputs
cmds = ['bash', '-c']
cmds.append(cmd)
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return (p.returncode, stdout, stderr)
def report(self, result):
self.redis_obj.publish(self.report_chan_name, result)
def process_cmd(self, cmd_data):
cmd_res_tuple = self.exec_command(cmd_data['cmd'])
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
cmd_res_dict['parser_cb'] = cmd_data['parser_cb']
self.report(cmd_res_dict)
def send_hello(self):
# Sending "hello" message to master node every 2 seconds
while not self.stop_hello.is_set():
self.report("hello")
time.sleep(2)
def work(self):
for item in self.pubsub.listen():
if item['type'] != 'message':
continue
if item['data'] == 'iamhere':
# When a "iamhere" packet is received, means the master node
# acknowledged the current VM. So stopped sending more
# "hello" packet to the master node.
# Unfortunately, there is no thread.stop() in Python 2.x
self.stop_hello.set()
continue
# Convert the string representation of dict to real dict obj
cmd_data = eval(item['data'])
self.process_cmd(cmd_data)
if __name__ == "__main__":
if (len(sys.argv) <= 1):
print("ERROR: Expecting the redis server address.")
sys.exit(1)
redis_server, redis_server_port = sys.argv[1].split(':', 1)
agent = KB_VM_Agent(redis_server, redis_server_port)
agent.setup_channels()
agent.hello_thread = threading.Thread(target=agent.send_hello)
agent.hello_thread.daemon = True
agent.hello_thread.start()
agent.work()

View File

@ -22,6 +22,7 @@ from keystoneclient.v2_0 import client as keystoneclient
import log as logging
from novaclient.exceptions import ClientException
from oslo_config import cfg
from tabulate import tabulate
import tenant
import credentials
@ -73,7 +74,7 @@ class Kloud(object):
def create_resources(self, shared_net=None):
self.shared_network = shared_net
for tenant_count in xrange(self.scale_cfg['number_tenants']):
tenant_name = self.prefix + "_T" + str(tenant_count)
tenant_name = self.prefix + "-T" + str(tenant_count)
new_tenant = tenant.Tenant(tenant_name, self)
self.tenant_list.append(new_tenant)
new_tenant.create_resources()
@ -108,7 +109,7 @@ class KloudBuster(object):
self.tenant = None
self.tenant_list_testing = []
self.tenant_testing = None
# to do : check on same auth_url instead
# TODO(check on same auth_url instead)
if cred == testing_cred:
self.single_cloud = True
else:
@ -116,15 +117,27 @@ class KloudBuster(object):
self.kloud = Kloud(config_scale.server, cred)
self.testing_kloud = Kloud(config_scale.client, testing_cred, testing_side=True)
def print_vms_info(self, role):
pass
def print_provision_info(self):
"""
Function that iterates and prints all VM info
for tested and testing cloud
"""
pass
table = [["VM Name", "Internal IP", "Floating IP", "Subnet", "Shared Interface IP"]]
client_list = self.kloud.get_all_instances()
for instance in client_list:
row = [instance.vm_name, instance.fixed_ip, instance.fip_ip, instance.subnet_ip,
instance.shared_interface_ip]
table.append(row)
LOG.info('Provision Details (Tested Kloud)\n' +
tabulate(table, headers="firstrow", tablefmt="psql"))
table = [["VM Name", "Internal IP", "Floating IP", "Subnet"]]
client_list = self.testing_kloud.get_all_instances()
for instance in client_list:
row = [instance.vm_name, instance.fixed_ip, instance.fip_ip, instance.subnet_ip]
table.append(row)
LOG.info('Provision Details (Testing Kloud)\n' +
tabulate(table, headers="firstrow", tablefmt="psql"))
def run(self):
"""
@ -155,8 +168,8 @@ class KloudBuster(object):
client_list[idx].target_url = "http://%s/index.html" %\
(svr.fip_ip or svr.fixed_ip)
kbscheduler = kb_scheduler.KBScheduler()
kbscheduler.run(client_list)
kbscheduler = kb_scheduler.KBScheduler(client_list, config_scale.client)
kbscheduler.run()
except KeyboardInterrupt:
traceback.format_exc()
except (sshutils.SSHError, ClientException, Exception):

View File

@ -28,7 +28,7 @@ CONF = cfg.CONF
CONF.register_cli_opts(DEBUG_OPTS)
oslogging.register_options(CONF)
logging.KBDEBUG = logging.DEBUG + 1
logging.KBDEBUG = logging.DEBUG + 5
logging.addLevelName(logging.KBDEBUG, "KBDEBUG")
CRITICAL = logging.CRITICAL
@ -41,7 +41,6 @@ KBDEBUG = logging.KBDEBUG
WARN = logging.WARN
WARNING = logging.WARNING
def setup(product_name, version="unknown"):
dbg_color = handlers.ColorHandler.LEVEL_COLORS[logging.DEBUG]
handlers.ColorHandler.LEVEL_COLORS[logging.KBDEBUG] = dbg_color

View File

@ -1,4 +1,4 @@
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,53 +14,55 @@
#
import os
import re
import stat
import subprocess
import time
import sshutils
from base_compute import BaseCompute
import log as logging
import redis
from wrk_tool import WrkTool
LOG = logging.getLogger(__name__)
# An openstack instance (can be a VM or a LXC)
class PerfInstance(BaseCompute):
def __init__(self, nova_client, user_name, config=None, is_server=False):
BaseCompute.__init__(self, nova_client, user_name)
if not config:
# HACK ALERT!!!
# We are expecting to see a valid config, here we just hack
class config:
ssh_vm_username = "ubuntu"
tp_tool = None
http_tool = WrkTool
perf_tool_path = './tools'
private_key_file = './ssh/id_rsa'
ssh_retry_count = 50
debug = True
time = 30
vm_bandwidth = None
def __init__(self, vm_name, nova_client, user_name, config, is_server=False):
BaseCompute.__init__(self, vm_name, nova_client, user_name)
self.config = config
self.internal_ip = None
self.is_server = is_server
# SSH Configuration
self.ssh_ip = None
self.ssh_user = config.ssh_vm_username
self.ssh = None
self.port = None
self.is_server = is_server
if config.tp_tool:
self.tp_tool = config.tp_tool(self, config.perf_tool_path)
else:
# Redis Configuration
self.redis_obj = None
self.pubsub = None
self.up_flag = False
self.orches_chan_name = self.vm_name.lower() + "_orches"
self.report_chan_name = self.vm_name.lower() + "_report"
if 'tp_tool' not in config:
self.tp_tool = None
if config.http_tool:
self.http_tool = config.http_tool(self, config.perf_tool_path)
# elif config.tp_tool.lower() == 'nuttcp':
# self.tp_tool = nuttcp_tool.NuttcpTool
# elif opts.tp_tool.lower() == 'iperf':
# self.tp_tool = iperf_tool.IperfTool
# else:
# self.tp_tool = None
if 'http_tool' not in config:
self.http_tool = None
elif config.http_tool.lower() == 'wrk':
self.http_tool = WrkTool(self)
self.target_server = None
self.target_url = None
else:
@ -68,6 +70,7 @@ class PerfInstance(BaseCompute):
def run_tp_client(self, label, dest_ip, target_instance,
mss=None, bandwidth=0, bidirectional=False, az_to=None):
# NOTE: This function will not work, and pending to convert to use redis
'''test iperf client using the default TCP window size
(tcp window scaling is normally enabled by default so setting explicit window
size is not going to help achieve better results)
@ -103,19 +106,19 @@ class PerfInstance(BaseCompute):
def run_http_client(self, threads, connections,
timeout=5, connection_type="Keep-alive"):
# HTTP Performance Measurement
if self.http_tool:
http_tool_res = self.http_tool.run_client(self.target_url,
threads,
connections,
timeout,
connection_type)
res = {'target_url': self.target_url}
if self.internal_ip:
res['ip_from'] = self.internal_ip
res['distro_id'] = self.ssh.distro_id
res['distro_version'] = self.ssh.distro_version
else:
http_tool_res = []
cmd = self.http_tool.cmd_run_client(self.target_url,
threads,
connections,
timeout,
connection_type)
parser_cb = 'self.run_http_client_parser'
self.redis_exec_command(cmd, parser_cb)
def run_http_client_parser(self, status, stdout, stderr):
http_tool_res = self.http_tool.cmd_parser_run_client(status, stdout, stderr)
res = {'target_url': self.target_url}
if self.internal_ip:
res['ip_from'] = self.internal_ip
# consolidate results for all tools
res['results'] = http_tool_res
@ -137,80 +140,83 @@ class PerfInstance(BaseCompute):
# Send a command on the ssh session
def exec_command(self, cmd, timeout=30):
(status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout)
# if status:
# LOG.error("[%s] cmd=%s" % (self.vm_name, cmd))
# if cmd_output:
# LOG.error("[%s] stdout=%s" % (self.vm_name, cmd_output))
# if err:
# LOG.error("[%s] stderr=%s" % (self.vm_name, err))
# LOG.kbdebug("[%s] %s" % (self.vm_name, cmd_output))
return (status, cmd_output, err)
# Ping an IP from this instance
def ping_check(self, target_ip, ping_count, pass_threshold):
return self.ssh.ping_check(target_ip, ping_count, pass_threshold)
# Given a message size verify if ping without fragmentation works or fails
# Returns True if success
def ping_do_not_fragment(self, msg_size, ip_address):
cmd = "ping -M do -c 1 -s " + str(msg_size) + " " + ip_address
(_, cmd_output, _) = self.exec_command(cmd)
match = re.search('100% packet loss', cmd_output)
if match:
return False
# Setup the redis connectivity
def setup_redis(self, host=None, port=None, connection_pool=None):
if connection_pool:
self.redis_obj = redis.StrictRedis(connection_pool=connection_pool)
else:
return True
self.redis_obj = redis.StrictRedis(host=host, port=port)
# Set the interface IP address and mask
def set_interface_ip(self, if_name, ip, mask):
LOG.kbdebug("[%s] Setting interface %s to %s mask %s" % (self.vm_name,
if_name, ip,
mask))
cmd2apply = "sudo ifconfig %s %s netmask %s" % (if_name, ip, mask)
(rc, _, _) = self.ssh.execute(cmd2apply)
return rc
# Check for connections to redis server
for retry in xrange(1, self.config.redis_retry_count + 1):
try:
self.redis_obj.get("test")
except (redis.exceptions.ConnectionError):
LOG.warn("Connecting to redis server... Retry #%d", retry)
time.sleep(1)
continue
break
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
# Get an interface IP address (returns None if error)
def get_interface_ip(self, if_name):
LOG.kbdebug("[%s] Getting interface %s IP and mask" % (self.vm_name,
if_name))
cmd2apply = "ifconfig %s" % (if_name)
(rc, res, _) = self.ssh.execute(cmd2apply)
if rc:
return True
def redis_get_message(self):
message = self.pubsub.get_message()
while message and message['data'] == 'hello':
# If a "hello" packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all "hello"
# messages received afterwards.
if self.up_flag:
message = self.pubsub.get_message()
else:
self.up_flag = True
self.redis_acknowledge_hello()
return (0, "", "")
if not message:
return None
# eth5 Link encap:Ethernet HWaddr 90:e2:ba:40:74:05
# inet addr:172.29.87.29 Bcast:172.29.87.31 Mask:255.255.255.240
# inet6 addr: fe80::92e2:baff:fe40:7405/64 Scope:Link
match = re.search(r'inet addr:([\d\.]*) ', res)
if not match:
return None
return match.group(1)
# Set an interface MTU to passed in value
def set_interface_mtu(self, if_name, mtu):
LOG.kbdebug("[%s] Setting interface %s mtu to %d" % (self.vm_name,
if_name, mtu))
cmd2apply = "sudo ifconfig %s mtu %d" % (if_name, mtu)
(rc, _, _) = self.ssh.execute(cmd2apply)
return rc
LOG.kbdebug(message)
msg_body = eval(message['data'])
status = int(msg_body['status'])
stdout = msg_body['stdout']
stderr = msg_body['stderr']
parser_cb = msg_body['parser_cb']
# Get the MTU of an interface
def get_interface_mtu(self, if_name):
cmd = "cat /sys/class/net/%s/mtu" % (if_name)
(_, cmd_output, _) = self.exec_command(cmd)
return int(cmd_output)
if parser_cb is not None:
stdout = eval("%s(status, stdout, stderr)" % parser_cb)
return (status, stdout, stderr)
def redis_acknowledge_hello(self):
self.redis_obj.publish(self.orches_chan_name, "iamhere")
def redis_exec_command(self, cmd, parser_cb=None, timeout=30):
# TODO(Add timeout support)
msg_body = {'cmd': cmd, 'parser_cb': parser_cb}
LOG.kbdebug(msg_body)
self.redis_obj.publish(self.orches_chan_name, msg_body)
# Check whether the HTTP Service is up running
def check_http_service(self):
cmd = 'while true; do\n'
cmd += 'curl --head %s --connect-timeout 2 --silent\n' % (self.target_url)
cmd += 'if [ $? -eq 0 ]; then break; fi\n'
cmd += 'done'
self.redis_exec_command(cmd, None)
# Add static route
def add_static_route(self, network, next_hop_ip, if_name=None):
debug_msg = "[%s] Adding static route %s with next hop %s" % (
self.vm_name, network,
next_hop_ip)
debug_msg = "Adding static route %s with next hop %s" % (network, next_hop_ip)
cmd = "sudo ip route add %s via %s" % (network, next_hop_ip)
if if_name:
debug_msg += " and %s" % if_name
cmd += " dev %s" % if_name
LOG.kbdebug(debug_msg)
return self.ssh.execute(cmd)[0]
self.redis_exec_command(cmd, None)
# Get static route
def get_static_route(self, network, next_hop_ip=None, if_name=None):
@ -219,11 +225,8 @@ class PerfInstance(BaseCompute):
cmd += " via %s" % next_hop_ip
if if_name:
cmd += " dev %s" % if_name
(rc, out, err) = self.ssh.execute(cmd)
if rc:
return err
else:
return out
# TODO(Need to implement a parser_cb instead of passing None)
self.redis_exec_command(cmd, None)
# Delete static route
def delete_static_route(self, network, next_hop_ip=None, if_name=None):
@ -239,7 +242,7 @@ class PerfInstance(BaseCompute):
debug_msg = "with next hop %s" % if_name
cmd += " dev %s" % if_name
LOG.kbdebug(debug_msg)
return self.ssh.execute(cmd)[0]
self.redis_exec_command(cmd, None)
# scp a file from the local host to the instance
# Returns True if dest file already exists or scp succeeded
@ -274,14 +277,11 @@ class PerfInstance(BaseCompute):
return False
return True
def get_cmd_duration(self):
'''Get the duration of the client run
Will normally return the time configured in config.time
'''
return self.config.time
# Dispose the ssh session
def dispose(self):
if self.ssh:
self.ssh.close()
self.ssh = None
if self.redis_obj:
self.pubsub.unsubscribe()
self.pubsub.close()

View File

@ -27,12 +27,11 @@ SCP_DEST_DIR = '/var/tmp/'
class PerfTool(object):
__metaclass__ = abc.ABCMeta
def __init__(self, name, perf_tool_path, instance):
def __init__(self, name, instance):
self.name = name
self.instance = instance
self.dest_path = SCP_DEST_DIR + name
self.pid = None
self.perf_tool_path = perf_tool_path
# Terminate pid if started
def dispose(self):
@ -55,7 +54,7 @@ class PerfTool(object):
res['throughput_kbps'] = throughput
if protocol is not None:
res['protocol'] = protocol
if self.instance.config.vm_bandwidth:
if 'vm_bandwidth' in self.instance.config:
res['bandwidth_limit_kbps'] = self.instance.config.vm_bandwidth
if lossrate is not None:
res['loss_rate'] = lossrate
@ -81,34 +80,13 @@ class PerfTool(object):
res['http_err'] = http_err
return res
def get_boost_client_cmd(self):
cmd = 'ulimit -n 102400 && ' \
'sysctl -w fs.file-max=6553550 && ' \
'sysctl -w net.core.wmem_max=8388608 && ' \
'sysctl -w net.core.wmem_default=8388608 && ' \
'sysctl -w net.core.rmem_max=33554432 && ' \
'sysctl -w net.core.rmem_default=33554432 && ' \
'sysctl -w net.core.netdev_max_backlog=100000 && ' \
'sysctl -w net.ipv4.icmp_ratelimit=0 && ' \
'sysctl -w net.ipv4.tcp_tw_recycle=1 && ' \
'sysctl -w net.ipv4.tcp_tw_reuse=1 && ' \
'sysctl -w net.ipv4.tcp_max_tw_buckets=65536 && ' \
'sysctl -w net.ipv4.tcp_fin_timeout=15 && ' \
'sysctl -w net.ipv4.tcp_max_syn_backlog=65536 && ' \
'sysctl -w net.ipv4.tcp_syncookies=1 && ' \
'sysctl -w net.ipv4.neigh.default.gc_thresh1=4096 && ' \
'sysctl -w net.ipv4.neigh.default.gc_thresh2=4096 && ' \
'sysctl -w net.ipv4.neigh.default.gc_thresh3=4096 && ' \
'sysctl -w net.ipv4.conf.all.rp_filter=0 && ' \
'sysctl -w net.ipv4.conf.all.arp_filter=0 && ' \
'sysctl -w net.ipv4.conf.default.rp_filter=0 && ' \
'sysctl -w net.ipv4.conf.default.arp_filter=0 && ' \
'sysctl -w net.ipv4.conf.eth0.rp_filter=0 && ' \
'sysctl -w net.ipv4.conf.eth0.arp_filter=0'
return cmd
@abc.abstractmethod
def cmd_run_client(**kwargs):
# must be implemented by sub classes
return None
@abc.abstractmethod
def run_client(**kwargs):
def cmd_parser_run_client(self, status, stdout, stderr):
# must be implemented by sub classes
return None

View File

@ -49,7 +49,7 @@ class Tenant(object):
LOG.info("Creating tenant: " + self.tenant_name)
self.tenant_object = \
self.kloud.keystone.tenants.create(tenant_name=self.tenant_name,
description="Test tenant",
description="KloudBuster tenant",
enabled=True)
except keystone_exception.Conflict as exc:
# ost likely the entry already exists:
@ -74,7 +74,7 @@ class Tenant(object):
# Loop over the required number of users and create resources
for user_count in xrange(self.kloud.scale_cfg['users_per_tenant']):
user_name = self.tenant_name + "_U" + str(user_count)
user_name = self.tenant_name + "-U" + str(user_count)
user_instance = users.User(user_name,
self,
self.kloud.scale_cfg['keystone_admin_role'])

View File

@ -62,7 +62,7 @@ class User(object):
LOG.info("Creating user: " + self.user_name)
return self.tenant.kloud.keystone.users.create(name=self.user_name,
password=self.user_name,
email="test.com",
email="kloudbuster@localhost",
tenant_id=self.tenant.tenant_id)
def _get_user(self):
@ -132,7 +132,9 @@ class User(object):
config_scale = self.tenant.kloud.scale_cfg
# Find the external network that routers need to attach to
if config_scale['use_floatingip']:
# if redis_server is configured, we need to attach the router to the
# external network in order to reach the redis_server
if config_scale['use_floatingip'] or 'redis_server' in config_scale:
external_network = base_network.find_external_network(self.neutron)
else:
external_network = None
@ -142,7 +144,7 @@ class User(object):
router_instance = base_network.Router(self.neutron, self.nova, self.user_name,
self.tenant.kloud.shared_network)
self.router_list.append(router_instance)
router_name = self.user_name + "_R" + str(router_count)
router_name = self.user_name + "-R" + str(router_count)
# Create the router and also attach it to external network
router_instance.create_router(router_name, external_network)
# Now create the network resources inside the router

View File

@ -1,4 +1,4 @@
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -18,47 +18,29 @@ import re
import log as logging
from perf_tool import PerfTool
import sshutils
LOG = logging.getLogger(__name__)
class WrkTool(PerfTool):
def __init__(self, instance, perf_tool_path):
PerfTool.__init__(self, 'wrk-4.0.1', perf_tool_path, instance)
def __init__(self, instance):
PerfTool.__init__(self, 'wrk-4.0.1', instance)
def get_server_launch_cmd(self):
'''This requires HTTP server is running already
def cmd_run_client(self, target_url, threads, connections,
timeout=5, connetion_type='Keep-alive', retry_count=10):
'''
return None
def run_client(self, target_url, threads, connections,
timeout=5, connetion_type='New', retry_count=10):
'''Run the test
:return: list containing one or more dictionary results
Return the command for running the benchmarking tool
'''
duration_sec = self.instance.get_cmd_duration()
# boost_cmd = self.get_boost_client_cmd()
# cmd = 'sudo sh -c "' + boost_cmd + ' && exec su $LOGNAME -c \''
duration_sec = self.instance.config.exec_time
cmd = '%s -t%d -c%d -d%ds --timeout %ds --latency %s' % \
(self.dest_path, threads, connections, duration_sec, timeout, target_url)
# cmd += ' && exit\'"'
LOG.kbdebug("[%s] Measuring HTTP performance..." %
self.instance.vm_name)
LOG.kbdebug("[%s] %s" % (self.instance.vm_name, cmd))
try:
# force the timeout value with 20 seconds extra for the command to
# complete and do not collect CPU
(_, cmd_out, _) = self.instance.exec_command(cmd, duration_sec + 20)
except sshutils.SSHError as exc:
# Timout or any SSH error
LOG.error("SSH Error: " + str(exc))
return [self.parse_error(str(exc))]
return cmd
def cmd_parser_run_client(self, status, stdout, stderr):
if status:
return [self.parse_error(stderr)]
# Sample Output:
# Running 10s test @ http://192.168.1.1/index.html
# 8 threads and 5000 connections
@ -75,16 +57,15 @@ class WrkTool(PerfTool):
# Non-2xx or 3xx responses: 828
# Requests/sec: 6080.66
# Transfer/sec: 282.53MB
try:
total_req_str = r'(\d+)\srequests\sin'
http_total_req = re.search(total_req_str, cmd_out).group(1)
http_total_req = re.search(total_req_str, stdout).group(1)
re_str = r'Requests/sec:\s+(\d+\.\d+)'
http_rps = re.search(re_str, cmd_out).group(1)
http_rps = re.search(re_str, stdout).group(1)
re_str = r'Transfer/sec:\s+(\d+\.\d+.B)'
http_rates_kbytes = re.search(re_str, cmd_out).group(1)
http_rates_kbytes = re.search(re_str, stdout).group(1)
# Uniform in unit MB
ex_unit = 'KMG'.find(http_rates_kbytes[-2])
if ex_unit == -1:
@ -93,7 +74,7 @@ class WrkTool(PerfTool):
http_rates_kbytes = float(val * (1024 ** (ex_unit)))
re_str = r'Socket errors: connect (\d+), read (\d+), write (\d+), timeout (\d+)'
http_sock_err = re.search(re_str, cmd_out)
http_sock_err = re.search(re_str, stdout)
if http_sock_err:
v1 = int(http_sock_err.group(1))
v2 = int(http_sock_err.group(2))
@ -104,13 +85,13 @@ class WrkTool(PerfTool):
http_sock_err = 0
re_str = r'Non-2xx or 3xx responses: (\d+)'
http_err = re.search(re_str, cmd_out)
http_err = re.search(re_str, stdout)
if http_err:
http_err = http_err.group(1)
else:
http_err = 0
except Exception:
return self.parse_error('Could not parse: %s' % (cmd_out))
return self.parse_error('Could not parse: %s' % (stdout))
return self.parse_results(http_total_req=http_total_req,
http_rps=http_rps,