Added Multicast Support.

Change-Id: I7781bcfb232d677f6243aef20bcc7fa056737101
This commit is contained in:
Michael Napolitano 2016-05-11 19:16:57 -04:00
parent 3e3be62ba4
commit b3e7917d6b
7 changed files with 138 additions and 69 deletions

View File

@ -16,7 +16,8 @@ VMTP Usage
[--tp-tool <nuttcp|iperf>]
[--availability_zone <availability_zone>] [--hypervisor [<az>:]
<hostname>] [--inter-node-only] [--same-network-only]
[--protocols <T|U|I>] [--bandwidth <bandwidth>]
[--protocols <T|U|I>] [--multicast <multicast_address>]
[--bandwidth <bandwidth>]
[--tcpbuf <tcp_pkt_size1,...>] [--udpbuf <udp_pkt_size1,...>]
[--reuse_network_name <network_name>]
[--os-dataplane-network <network_name>]
@ -58,7 +59,10 @@ VMTP Usage
--inter-node-only only measure inter-node
--same-network-only only measure same network
--protocols <T|U|I> protocols T(TCP), U(UDP), I(ICMP) - default=TUI (all)
--bandwidth <bandwidth>
--multicast <multicast_address>
bind to multicast address. (implies
--protocols U, --tp-tool nuttcp)
--bandwidth <bandwidth>
the bandwidth limit for TCP/UDP flows in K/M/Gbps,
e.g. 128K/32M/5G. (default=no limit)
--tcpbuf <tcp_pkt_size1,...>

View File

@ -44,6 +44,7 @@ class Instance(object):
self.instance = None
self.ssh = None
self.port = None
self.created_multicast_route = False
if config.gmond_svr_ip:
self.gmond_svr = config.gmond_svr_ip
else:
@ -195,6 +196,29 @@ class Instance(object):
else:
return True
def add_multicast_route(self, ifname=None):
if not ifname:
cmd = "route -n | grep 'UG[ \t]' | awk '{print $8}'" # name of the default interface
(status, ifname, err) = self.ssh.execute(cmd, timeout=10)
if status == 127 or ifname[0:5] == "usage":
cmd = "netstat -nr | grep default | awk '{print $6}'"
(status, ifname, err) = self.ssh.execute(cmd, timeout=10)
cmd = "ip route add 224.0.0.0/4 dev " + ifname
(status, _, _) = self.ssh.execute(cmd, timeout=10)
if status == 127:
cmd = "route add -net 224.0.0.0/4 dev " + ifname
(status, _, _) = self.ssh.execute(cmd, timeout=10)
self.created_multicast_route = status == 0
return status
def del_multicast_route(self):
(status, _, _) = self.ssh.execute("ip route delete 224.0.0.0/4", timeout=10)
if status == 127:
status = self.ssh.execute("route delete -net 224.0.0.0/4", timeout=10)
return status
# Set the interface IP address and mask
def set_interface_ip(self, if_name, ip, mask):
self.buginf('Setting interface %s to %s mask %s', if_name, ip, mask)
@ -284,6 +308,8 @@ class Instance(object):
# Delete the server instance
# Dispose the ssh session
def dispose(self):
if self.created_multicast_route:
self.del_multicast_route()
if self.ssh_ip_id:
self.net.delete_floating_ip(self.ssh_ip_id)
self.buginf('Floating IP %s deleted', self.ssh_access.host)

View File

@ -52,12 +52,12 @@ class IperfTool(PerfTool):
# Get list of protocols and packet sizes to measure
(proto_list, proto_pkt_sizes) = self.get_proto_profile()
for udp, pkt_size_list in zip(proto_list, proto_pkt_sizes):
for proto, pkt_size_list in zip(proto_list, proto_pkt_sizes):
# bidirectional is not supported for udp
# (need to find the right iperf options to make it work as there are
# issues for the server to send back results to the client in reverse
# direction
if udp:
if proto == 'UDP':
bidir = False
loop_count = 1
else:
@ -69,7 +69,7 @@ class IperfTool(PerfTool):
res = self.run_client_dir(target_ip, mss,
bandwidth_kbps=bandwidth,
bidirectional=bidir,
udp=udp,
protocol=proto,
length=pkt_size)
# for bidirectional the function returns a list of 2 results
res_list.extend(res)
@ -79,7 +79,7 @@ class IperfTool(PerfTool):
mss,
bidirectional=False,
bandwidth_kbps=0,
udp=False,
protocol='TCP',
length=0,
no_cpu_timed=0):
'''Run client for given protocol and packet size
@ -97,8 +97,7 @@ class IperfTool(PerfTool):
# scaling is normally enabled by default so setting explicit window
# size is not going to help achieve better results)
opts = ''
protocol = 'UDP' if udp else 'TCP'
udp = protocol == "UDP"
# run iperf client using the default TCP window size (tcp window
# scaling is normally enabled by default so setting explicit window
# size is not going to help achieve better results)
@ -116,7 +115,7 @@ class IperfTool(PerfTool):
# for UDP if the bandwidth is not provided we need to calculate
# the optimal bandwidth
if not bandwidth_kbps:
udp_res = self.find_udp_bdw(length, target_ip)
udp_res = self.find_bdw(length, target_ip, protocol)
if 'error' in udp_res:
return [udp_res]
if not self.instance.gmond_svr:

View File

@ -43,16 +43,15 @@ class NuttcpTool(PerfTool):
# Get list of protocols and packet sizes to measure
(proto_list, proto_pkt_sizes) = self.get_proto_profile()
for udp, pkt_size_list in zip(proto_list, proto_pkt_sizes):
for proto, pkt_size_list in zip(proto_list, proto_pkt_sizes):
for pkt_size in pkt_size_list:
for reverse_dir in reverse_dir_list:
# nuttcp does not support reverse dir for UDP...
if reverse_dir and udp:
if reverse_dir and proto != "TCP":
continue
if udp:
self.instance.display('Measuring UDP Throughput (packet size=%d)...',
pkt_size)
if proto != "TCP":
self.instance.display('Measuring %s Throughput (packet size=%d)...',
proto, pkt_size)
loop_count = 1
else:
# For accuracy purpose, TCP throughput will be measured 3 times
@ -63,45 +62,48 @@ class NuttcpTool(PerfTool):
res = self.run_client_dir(target_ip, mss,
reverse_dir=reverse_dir,
bandwidth_kbps=bandwidth,
udp=udp,
protocol=proto,
length=pkt_size)
res_list.extend(res)
# For UDP reverse direction we need to start the server on self.instance
# and run the client on target_instance
if bidirectional and 'U' in self.instance.config.protocols:
# Start the server on the client (this tool instance)
self.instance.display('Start UDP server for reverse dir')
if self.start_server():
# Start the client on the target instance
target_instance.display('Starting UDP client for reverse dir')
if bidirectional:
for proto in proto_list:
if proto == 'TCP':
continue
# Start the server on the client (this tool instance)
self.instance.display('Start ' + proto + ' server for reverse dir')
if self.start_server():
# Start the client on the target instance
target_instance.display('Starting ' + proto + ' client for reverse dir')
for pkt_size in self.instance.config.udp_pkt_sizes:
self.instance.display('Measuring UDP Throughput packet size=%d'
' (reverse direction)...',
pkt_size)
res = target_instance.tp_tool.run_client_dir(self.instance.internal_ip,
mss,
bandwidth_kbps=bandwidth,
udp=True,
length=pkt_size)
res[0]['direction'] = 'reverse'
res_list.extend(res)
else:
self.instance.display('Failed to start UDP server for reverse dir')
for pkt_size in self.instance.config.udp_pkt_sizes:
self.instance.display('Measuring %s Throughput packet size=%d'
' (reverse direction)...',
proto, pkt_size)
res = target_instance.tp_tool.run_client_dir(self.instance.internal_ip,
mss,
bandwidth_kbps=bandwidth,
protocol=proto,
length=pkt_size)
res[0]['direction'] = 'reverse'
res_list.extend(res)
else:
self.instance.display('Failed to start ' + proto + ' server for reverse dir')
return res_list
def run_client_dir(self, target_ip,
mss,
reverse_dir=False,
bandwidth_kbps=0,
udp=False,
protocol='TCP',
length=0,
no_cpu_timed=0):
'''Run client in one direction
:param reverse_dir: True if reverse the direction (tcp only for now)
:param bandwidth_kbps: transmit rate limit in Kbps
:param udp: if true get UDP throughput, else get TCP throughput
:param protocol: (TCP|UDP|Multicast)
:param length: length of network write|read buf (default 1K|8K/udp, 64K/tcp)
for udp is the packet size
:param no_cpu_timed: if non zero will disable cpu collection and override
@ -114,8 +116,9 @@ class NuttcpTool(PerfTool):
# scaling is normally enabled by default so setting explicit window
# size is not going to help achieve better results)
opts = ''
protocol = 'UDP' if udp else 'TCP'
multicast = protocol == 'Multicast'
tcp = protocol == 'TCP'
udp = protocol == 'UDP'
if mss:
opts += "-M" + str(mss)
if reverse_dir:
@ -124,12 +127,14 @@ class NuttcpTool(PerfTool):
opts += " -l" + str(length)
if self.instance.config.ipv6_mode:
opts += " -6 "
if udp:
if multicast:
opts += " -m32 -o -j -g" + self.instance.config.multicast_addr
if not tcp:
opts += " -u"
# for UDP if the bandwidth is not provided we need to calculate
# the optimal bandwidth
if not bandwidth_kbps:
udp_res = self.find_udp_bdw(length, target_ip)
udp_res = self.find_bdw(length, target_ip, protocol)
if 'error' in udp_res:
return [udp_res]
if not self.instance.gmond_svr:
@ -164,27 +169,35 @@ class NuttcpTool(PerfTool):
self.instance.display('SSH Error:' + str(exc))
return [self.parse_error(protocol, str(exc))]
if udp:
# UDP output (unicast and multicast):
if udp or multicast:
# UDP output:
# megabytes=1.1924 real_seconds=10.01 rate_Mbps=0.9997 tx_cpu=99 rx_cpu=0
# drop=0 pkt=1221 data_loss=0.00000
re_udp = r'rate_Mbps=([\d\.]*) tx_cpu=\d* rx_cpu=\d* drop=(\-*\d*) pkt=(\d*)'
if multicast:
re_udp += r' data_loss=[\d\.]* msmaxjitter=([\d\.]*) msavgOWD=([\-\d\.]*)'
match = re.search(re_udp, cmd_out)
if match:
rate_mbps = float(match.group(1))
drop = float(match.group(2))
pkt = int(match.group(3))
jitter = None
if multicast:
jitter = float(match.group(4))
# Workaround for a bug of nuttcp that sometimes it will return a
# negative number for drop.
if drop < 0:
drop = 0
return [self.parse_results('UDP',
return [self.parse_results(protocol,
int(rate_mbps * 1024),
lossrate=round(drop * 100 / pkt, 2),
reverse_dir=reverse_dir,
msg_size=length,
cpu_load=cpu_load)]
cpu_load=cpu_load,
jitter=jitter)]
else:
# TCP output:
# megabytes=1083.4252 real_seconds=10.04 rate_Mbps=905.5953 tx_cpu=3 rx_cpu=19
@ -195,7 +208,7 @@ class NuttcpTool(PerfTool):
rate_mbps = float(match.group(1))
retrans = int(match.group(2))
rtt_ms = float(match.group(3))
return [self.parse_results('TCP',
return [self.parse_results(protocol,
int(rate_mbps * 1024),
retrans=retrans,
rtt_ms=rtt_ms,

View File

@ -54,6 +54,7 @@ class PerfInstance(Instance):
return False
if self.tp_tool and not self.tp_tool.install():
return False
self.add_multicast_route()
if not self.is_server:
return True
if self.tp_tool and not self.tp_tool.start_server():
@ -96,7 +97,8 @@ class PerfInstance(Instance):
res['az_to'] = az_to
res['distro_id'] = self.ssh.distro_id
res['distro_version'] = self.ssh.distro_version
if 'multicast_addr' in self.config:
res['multicast_address'] = self.config.multicast_addr
# consolidate results for all tools
if ping_res:
tp_tool_res.append(ping_res)

View File

@ -81,7 +81,7 @@ class PerfTool(object):
def parse_results(self, protocol, throughput, lossrate=None, retrans=None,
rtt_ms=None, reverse_dir=False,
msg_size=None,
cpu_load=None):
cpu_load=None, jitter=None):
res = {'throughput_kbps': throughput,
'protocol': protocol,
'tool': self.name}
@ -99,6 +99,8 @@ class PerfTool(object):
res['pkt_size'] = msg_size
if cpu_load:
res['cpu_load'] = cpu_load
if jitter:
res['jitter'] = jitter
return res
@abc.abstractmethod
@ -106,13 +108,13 @@ class PerfTool(object):
mss,
reverse_dir=False,
bandwidth_kbps=0,
udp=False,
protocol="TCP",
length=0,
no_cpu_timed=0):
# must be implemented by sub classes
return None
def find_udp_bdw(self, pkt_size, target_ip):
def find_bdw(self, pkt_size, target_ip, protocol="UDP"):
'''Find highest UDP bandwidth within max loss rate for given packet size
:return: a dictionary describing the optimal bandwidth (see parse_results())
'''
@ -154,7 +156,7 @@ class PerfTool(object):
# stop if the remaining range to cover is less than 5%
while (min_kbps * 100 / max_kbps) < 95:
res_list = self.run_client_dir(target_ip, 0, bandwidth_kbps=kbps,
udp=True, length=pkt_size,
protocol=protocol, length=pkt_size,
no_cpu_timed=1)
# always pick the first element in the returned list of dict(s)
# should normally only have 1 element
@ -204,14 +206,17 @@ class PerfTool(object):
'''Return a tuple containing the list of protocols (tcp/udp) and
list of packet sizes (udp only)
'''
# start with TCP (udp=False) then UDP
# start with TCP (protocol="TCP") then UDP
proto_list = []
proto_pkt_sizes = []
if 'T' in self.instance.config.protocols:
proto_list.append(False)
proto_list.append('TCP')
proto_pkt_sizes.append(self.instance.config.tcp_pkt_sizes)
if 'U' in self.instance.config.protocols:
proto_list.append(True)
proto_list.append('UDP')
proto_pkt_sizes.append(self.instance.config.udp_pkt_sizes)
if 'M' in self.instance.config.protocols:
proto_list.append('Multicast')
proto_pkt_sizes.append(self.instance.config.udp_pkt_sizes)
return (proto_list, proto_pkt_sizes)
@ -282,7 +287,7 @@ class PingTool(PerfTool):
mss,
reverse_dir=False,
bandwidth_kbps=0,
udp=False,
protocol="TCP",
length=0,
no_cpu_timed=0):
# not applicable

View File

@ -517,7 +517,7 @@ def get_controller_info(ssh_access, net, res_col, retry_count):
def gen_report_data(proto, result):
try:
if proto in ['TCP', 'UDP', 'ICMP']:
if proto in ['TCP', 'UDP', 'Multicast', 'ICMP']:
result = [x for x in result if x['protocol'] == proto]
elif proto == 'Upload':
result = [x for x in result if ('direction' not in x) and (x['protocol'] == 'TCP')]
@ -528,7 +528,7 @@ def gen_report_data(proto, result):
if proto in ['TCP', 'Upload', 'Download']:
tcp_test_count = 0
retval = {'tp_kbps': 0, 'rtt_ms': 0}
elif proto == 'UDP':
elif proto == 'UDP' or proto == 'Multicast':
pkt_size_list = [x['pkt_size'] for x in result]
retval = dict(zip(pkt_size_list, [{}, {}, {}]))
@ -537,9 +537,11 @@ def gen_report_data(proto, result):
tcp_test_count = tcp_test_count + 1
retval['tp_kbps'] += item['throughput_kbps']
retval['rtt_ms'] += item['rtt_ms']
elif proto == 'UDP':
elif proto == 'UDP' or proto == 'Multicast':
retval[item['pkt_size']]['tp_kbps'] = item['throughput_kbps']
retval[item['pkt_size']]['loss_rate'] = item['loss_rate']
if 'jitter' in item:
retval[item['pkt_size']]['jitter'] = item['jitter']
elif proto == 'ICMP':
for key in ['rtt_avg_ms', 'rtt_max_ms', 'rtt_min_ms', 'rtt_stddev']:
retval[key] = item[key]
@ -560,9 +562,9 @@ def print_report(results):
SPASS = "\033[92mPASSED\033[0m"
SFAIL = "\033[91mFAILED\033[0m"
# Initilize a run_status[4][2][2][3] array
run_status = [([([(["SKIPPED"] * 3) for i in range(2)]) for i in range(2)]) for i in range(4)]
run_data = [([([([{}] * 3) for i in range(2)]) for i in range(2)]) for i in range(4)]
# Initilize a run_status[4][2][2][4] array
run_status = [([([(["SKIPPED"] * 4) for i in range(2)]) for i in range(2)]) for i in range(4)]
run_data = [([([([{}] * 4) for i in range(2)]) for i in range(2)]) for i in range(4)]
flows = results['flows']
for flow in flows:
res = flow['results']
@ -585,7 +587,7 @@ def print_report(results):
idx0 = 3
idx1 = idx2 = 0
for item in res:
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']):
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']):
if (item['protocol'] == proto) and (run_status[idx0][idx1][idx2][idx3] != SFAIL):
if 'error' in item:
run_status[idx0][idx1][idx2][idx3] = SFAIL
@ -600,15 +602,15 @@ def print_report(results):
if net == 'Same Network' and ip == 'Floating IP':
continue
for idx2, node in enumerate(['Intra-node', 'Inter-node']):
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']):
row = [str(scenario / 3 + 1) + "." + str(idx3 + 1),
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']):
row = [str(scenario / 4 + 1) + "." + str(idx3 + 1),
"%s, %s, %s, %s" % (net, ip, node, proto),
run_status[idx0][idx1][idx2][idx3],
run_data[idx0][idx1][idx2][idx3]]
table.append(row)
scenario = scenario + 1
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']):
row = [str(scenario / 3 + 1) + "." + str(idx3 + 1),
for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']):
row = [str(scenario / 4 + 1) + "." + str(idx3 + 1),
"Native Throughput, %s" % (proto),
run_status[3][0][0][idx3], run_data[3][0][0][idx3]]
table.append(row)
@ -756,8 +758,16 @@ def parse_opts_from_cli():
parser.add_argument('--protocols', dest='protocols',
action='store',
default='TUI',
help='protocols T(TCP), U(UDP), I(ICMP) - default=TUI (all)',
metavar='<T|U|I>')
help='protocols T(TCP), U(UDP), I(ICMP), M(Multicast)'
' - default=TUI (TUIM if --multicast_addr is passed)',
metavar='<T|U|I|M>')
parser.add_argument('--multicast_addr', dest='multicast_addr',
action='store',
help='bind to multicast address for tests '
'(implies --protocols M[...], --tp-tool nuttcp )',
metavar='<multicast_address>')
parser.add_argument('--bandwidth', dest='vm_bandwidth',
action='store',
@ -928,6 +938,7 @@ def merge_opts_to_configs(opts):
sys.exit(1)
config.vm_bandwidth = int(val * (10 ** (ex_unit * 3)))
# the pkt size for TCP and UDP
if opts.tcp_pkt_sizes:
try:
@ -987,7 +998,16 @@ def merge_opts_to_configs(opts):
# Check the tp-tool name
config.protocols = opts.protocols.upper()
if 'T' in config.protocols or 'U' in config.protocols:
if 'M' in config.protocols or opts.multicast_addr:
# nuttcp required for multicast
opts.tp_tool = 'nuttcp'
config.tp_tool = nuttcp_tool.NuttcpTool
# If M provided, but not multicast_addr, use default (231.1.1.1)
config.multicast_addr = opts.multicast_addr if opts.multicast_addr else "231.1.1.1"
# If --multicast_addr provided, ensure 'M' is in protocols.
if 'M' not in config.protocols:
config.protocols += 'M'
elif 'T' in config.protocols or 'U' in config.protocols:
if opts.tp_tool.lower() == 'nuttcp':
config.tp_tool = nuttcp_tool.NuttcpTool
elif opts.tp_tool.lower() == 'iperf':