From b3e7917d6bdc5847bc6eeb4476f70106fcea6df9 Mon Sep 17 00:00:00 2001 From: Michael Napolitano Date: Wed, 11 May 2016 19:16:57 -0400 Subject: [PATCH] Added Multicast Support. Change-Id: I7781bcfb232d677f6243aef20bcc7fa056737101 --- doc/source/usage.rst | 8 +++- vmtp/instance.py | 26 +++++++++++++ vmtp/iperf_tool.py | 13 +++---- vmtp/nuttcp_tool.py | 87 +++++++++++++++++++++++++------------------ vmtp/perf_instance.py | 4 +- vmtp/perf_tool.py | 21 +++++++---- vmtp/vmtp.py | 48 +++++++++++++++++------- 7 files changed, 138 insertions(+), 69 deletions(-) diff --git a/doc/source/usage.rst b/doc/source/usage.rst index 1e86cd9..693cd84 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -16,7 +16,8 @@ VMTP Usage [--tp-tool ] [--availability_zone ] [--hypervisor [:] ] [--inter-node-only] [--same-network-only] - [--protocols ] [--bandwidth ] + [--protocols ] [--multicast ] + [--bandwidth ] [--tcpbuf ] [--udpbuf ] [--reuse_network_name ] [--os-dataplane-network ] @@ -58,7 +59,10 @@ VMTP Usage --inter-node-only only measure inter-node --same-network-only only measure same network --protocols protocols T(TCP), U(UDP), I(ICMP) - default=TUI (all) - --bandwidth + --multicast + bind to multicast address. (implies + --protocols U, --tp-tool nuttcp) + --bandwidth the bandwidth limit for TCP/UDP flows in K/M/Gbps, e.g. 128K/32M/5G. (default=no limit) --tcpbuf diff --git a/vmtp/instance.py b/vmtp/instance.py index 6d36f25..9949177 100644 --- a/vmtp/instance.py +++ b/vmtp/instance.py @@ -44,6 +44,7 @@ class Instance(object): self.instance = None self.ssh = None self.port = None + self.created_multicast_route = False if config.gmond_svr_ip: self.gmond_svr = config.gmond_svr_ip else: @@ -195,6 +196,29 @@ class Instance(object): else: return True + def add_multicast_route(self, ifname=None): + if not ifname: + cmd = "route -n | grep 'UG[ \t]' | awk '{print $8}'" # name of the default interface + (status, ifname, err) = self.ssh.execute(cmd, timeout=10) + if status == 127 or ifname[0:5] == "usage": + cmd = "netstat -nr | grep default | awk '{print $6}'" + (status, ifname, err) = self.ssh.execute(cmd, timeout=10) + + cmd = "ip route add 224.0.0.0/4 dev " + ifname + (status, _, _) = self.ssh.execute(cmd, timeout=10) + if status == 127: + cmd = "route add -net 224.0.0.0/4 dev " + ifname + (status, _, _) = self.ssh.execute(cmd, timeout=10) + self.created_multicast_route = status == 0 + return status + + def del_multicast_route(self): + (status, _, _) = self.ssh.execute("ip route delete 224.0.0.0/4", timeout=10) + if status == 127: + status = self.ssh.execute("route delete -net 224.0.0.0/4", timeout=10) + return status + + # Set the interface IP address and mask def set_interface_ip(self, if_name, ip, mask): self.buginf('Setting interface %s to %s mask %s', if_name, ip, mask) @@ -284,6 +308,8 @@ class Instance(object): # Delete the server instance # Dispose the ssh session def dispose(self): + if self.created_multicast_route: + self.del_multicast_route() if self.ssh_ip_id: self.net.delete_floating_ip(self.ssh_ip_id) self.buginf('Floating IP %s deleted', self.ssh_access.host) diff --git a/vmtp/iperf_tool.py b/vmtp/iperf_tool.py index eeb9d0b..610af1b 100644 --- a/vmtp/iperf_tool.py +++ b/vmtp/iperf_tool.py @@ -52,12 +52,12 @@ class IperfTool(PerfTool): # Get list of protocols and packet sizes to measure (proto_list, proto_pkt_sizes) = self.get_proto_profile() - for udp, pkt_size_list in zip(proto_list, proto_pkt_sizes): + for proto, pkt_size_list in zip(proto_list, proto_pkt_sizes): # bidirectional is not supported for udp # (need to find the right iperf options to make it work as there are # issues for the server to send back results to the client in reverse # direction - if udp: + if proto == 'UDP': bidir = False loop_count = 1 else: @@ -69,7 +69,7 @@ class IperfTool(PerfTool): res = self.run_client_dir(target_ip, mss, bandwidth_kbps=bandwidth, bidirectional=bidir, - udp=udp, + protocol=proto, length=pkt_size) # for bidirectional the function returns a list of 2 results res_list.extend(res) @@ -79,7 +79,7 @@ class IperfTool(PerfTool): mss, bidirectional=False, bandwidth_kbps=0, - udp=False, + protocol='TCP', length=0, no_cpu_timed=0): '''Run client for given protocol and packet size @@ -97,8 +97,7 @@ class IperfTool(PerfTool): # scaling is normally enabled by default so setting explicit window # size is not going to help achieve better results) opts = '' - protocol = 'UDP' if udp else 'TCP' - + udp = protocol == "UDP" # run iperf client using the default TCP window size (tcp window # scaling is normally enabled by default so setting explicit window # size is not going to help achieve better results) @@ -116,7 +115,7 @@ class IperfTool(PerfTool): # for UDP if the bandwidth is not provided we need to calculate # the optimal bandwidth if not bandwidth_kbps: - udp_res = self.find_udp_bdw(length, target_ip) + udp_res = self.find_bdw(length, target_ip, protocol) if 'error' in udp_res: return [udp_res] if not self.instance.gmond_svr: diff --git a/vmtp/nuttcp_tool.py b/vmtp/nuttcp_tool.py index 0eded39..d71d748 100644 --- a/vmtp/nuttcp_tool.py +++ b/vmtp/nuttcp_tool.py @@ -43,16 +43,15 @@ class NuttcpTool(PerfTool): # Get list of protocols and packet sizes to measure (proto_list, proto_pkt_sizes) = self.get_proto_profile() - - for udp, pkt_size_list in zip(proto_list, proto_pkt_sizes): + for proto, pkt_size_list in zip(proto_list, proto_pkt_sizes): for pkt_size in pkt_size_list: for reverse_dir in reverse_dir_list: # nuttcp does not support reverse dir for UDP... - if reverse_dir and udp: + if reverse_dir and proto != "TCP": continue - if udp: - self.instance.display('Measuring UDP Throughput (packet size=%d)...', - pkt_size) + if proto != "TCP": + self.instance.display('Measuring %s Throughput (packet size=%d)...', + proto, pkt_size) loop_count = 1 else: # For accuracy purpose, TCP throughput will be measured 3 times @@ -63,45 +62,48 @@ class NuttcpTool(PerfTool): res = self.run_client_dir(target_ip, mss, reverse_dir=reverse_dir, bandwidth_kbps=bandwidth, - udp=udp, + protocol=proto, length=pkt_size) res_list.extend(res) # For UDP reverse direction we need to start the server on self.instance # and run the client on target_instance - if bidirectional and 'U' in self.instance.config.protocols: - # Start the server on the client (this tool instance) - self.instance.display('Start UDP server for reverse dir') - if self.start_server(): - # Start the client on the target instance - target_instance.display('Starting UDP client for reverse dir') + if bidirectional: + for proto in proto_list: + if proto == 'TCP': + continue + # Start the server on the client (this tool instance) + self.instance.display('Start ' + proto + ' server for reverse dir') + if self.start_server(): + # Start the client on the target instance + target_instance.display('Starting ' + proto + ' client for reverse dir') - for pkt_size in self.instance.config.udp_pkt_sizes: - self.instance.display('Measuring UDP Throughput packet size=%d' - ' (reverse direction)...', - pkt_size) - res = target_instance.tp_tool.run_client_dir(self.instance.internal_ip, - mss, - bandwidth_kbps=bandwidth, - udp=True, - length=pkt_size) - res[0]['direction'] = 'reverse' - res_list.extend(res) - else: - self.instance.display('Failed to start UDP server for reverse dir') + for pkt_size in self.instance.config.udp_pkt_sizes: + self.instance.display('Measuring %s Throughput packet size=%d' + ' (reverse direction)...', + proto, pkt_size) + res = target_instance.tp_tool.run_client_dir(self.instance.internal_ip, + mss, + bandwidth_kbps=bandwidth, + protocol=proto, + length=pkt_size) + res[0]['direction'] = 'reverse' + res_list.extend(res) + else: + self.instance.display('Failed to start ' + proto + ' server for reverse dir') return res_list def run_client_dir(self, target_ip, mss, reverse_dir=False, bandwidth_kbps=0, - udp=False, + protocol='TCP', length=0, no_cpu_timed=0): '''Run client in one direction :param reverse_dir: True if reverse the direction (tcp only for now) :param bandwidth_kbps: transmit rate limit in Kbps - :param udp: if true get UDP throughput, else get TCP throughput + :param protocol: (TCP|UDP|Multicast) :param length: length of network write|read buf (default 1K|8K/udp, 64K/tcp) for udp is the packet size :param no_cpu_timed: if non zero will disable cpu collection and override @@ -114,8 +116,9 @@ class NuttcpTool(PerfTool): # scaling is normally enabled by default so setting explicit window # size is not going to help achieve better results) opts = '' - protocol = 'UDP' if udp else 'TCP' - + multicast = protocol == 'Multicast' + tcp = protocol == 'TCP' + udp = protocol == 'UDP' if mss: opts += "-M" + str(mss) if reverse_dir: @@ -124,12 +127,14 @@ class NuttcpTool(PerfTool): opts += " -l" + str(length) if self.instance.config.ipv6_mode: opts += " -6 " - if udp: + if multicast: + opts += " -m32 -o -j -g" + self.instance.config.multicast_addr + if not tcp: opts += " -u" # for UDP if the bandwidth is not provided we need to calculate # the optimal bandwidth if not bandwidth_kbps: - udp_res = self.find_udp_bdw(length, target_ip) + udp_res = self.find_bdw(length, target_ip, protocol) if 'error' in udp_res: return [udp_res] if not self.instance.gmond_svr: @@ -164,27 +169,35 @@ class NuttcpTool(PerfTool): self.instance.display('SSH Error:' + str(exc)) return [self.parse_error(protocol, str(exc))] - if udp: - # UDP output (unicast and multicast): + if udp or multicast: + # UDP output: # megabytes=1.1924 real_seconds=10.01 rate_Mbps=0.9997 tx_cpu=99 rx_cpu=0 # drop=0 pkt=1221 data_loss=0.00000 re_udp = r'rate_Mbps=([\d\.]*) tx_cpu=\d* rx_cpu=\d* drop=(\-*\d*) pkt=(\d*)' + if multicast: + re_udp += r' data_loss=[\d\.]* msmaxjitter=([\d\.]*) msavgOWD=([\-\d\.]*)' match = re.search(re_udp, cmd_out) if match: rate_mbps = float(match.group(1)) drop = float(match.group(2)) pkt = int(match.group(3)) + jitter = None + + if multicast: + jitter = float(match.group(4)) + # Workaround for a bug of nuttcp that sometimes it will return a # negative number for drop. if drop < 0: drop = 0 - return [self.parse_results('UDP', + return [self.parse_results(protocol, int(rate_mbps * 1024), lossrate=round(drop * 100 / pkt, 2), reverse_dir=reverse_dir, msg_size=length, - cpu_load=cpu_load)] + cpu_load=cpu_load, + jitter=jitter)] else: # TCP output: # megabytes=1083.4252 real_seconds=10.04 rate_Mbps=905.5953 tx_cpu=3 rx_cpu=19 @@ -195,7 +208,7 @@ class NuttcpTool(PerfTool): rate_mbps = float(match.group(1)) retrans = int(match.group(2)) rtt_ms = float(match.group(3)) - return [self.parse_results('TCP', + return [self.parse_results(protocol, int(rate_mbps * 1024), retrans=retrans, rtt_ms=rtt_ms, diff --git a/vmtp/perf_instance.py b/vmtp/perf_instance.py index 544b3bc..81c7573 100644 --- a/vmtp/perf_instance.py +++ b/vmtp/perf_instance.py @@ -54,6 +54,7 @@ class PerfInstance(Instance): return False if self.tp_tool and not self.tp_tool.install(): return False + self.add_multicast_route() if not self.is_server: return True if self.tp_tool and not self.tp_tool.start_server(): @@ -96,7 +97,8 @@ class PerfInstance(Instance): res['az_to'] = az_to res['distro_id'] = self.ssh.distro_id res['distro_version'] = self.ssh.distro_version - + if 'multicast_addr' in self.config: + res['multicast_address'] = self.config.multicast_addr # consolidate results for all tools if ping_res: tp_tool_res.append(ping_res) diff --git a/vmtp/perf_tool.py b/vmtp/perf_tool.py index 7fa427e..59f2c9c 100644 --- a/vmtp/perf_tool.py +++ b/vmtp/perf_tool.py @@ -81,7 +81,7 @@ class PerfTool(object): def parse_results(self, protocol, throughput, lossrate=None, retrans=None, rtt_ms=None, reverse_dir=False, msg_size=None, - cpu_load=None): + cpu_load=None, jitter=None): res = {'throughput_kbps': throughput, 'protocol': protocol, 'tool': self.name} @@ -99,6 +99,8 @@ class PerfTool(object): res['pkt_size'] = msg_size if cpu_load: res['cpu_load'] = cpu_load + if jitter: + res['jitter'] = jitter return res @abc.abstractmethod @@ -106,13 +108,13 @@ class PerfTool(object): mss, reverse_dir=False, bandwidth_kbps=0, - udp=False, + protocol="TCP", length=0, no_cpu_timed=0): # must be implemented by sub classes return None - def find_udp_bdw(self, pkt_size, target_ip): + def find_bdw(self, pkt_size, target_ip, protocol="UDP"): '''Find highest UDP bandwidth within max loss rate for given packet size :return: a dictionary describing the optimal bandwidth (see parse_results()) ''' @@ -154,7 +156,7 @@ class PerfTool(object): # stop if the remaining range to cover is less than 5% while (min_kbps * 100 / max_kbps) < 95: res_list = self.run_client_dir(target_ip, 0, bandwidth_kbps=kbps, - udp=True, length=pkt_size, + protocol=protocol, length=pkt_size, no_cpu_timed=1) # always pick the first element in the returned list of dict(s) # should normally only have 1 element @@ -204,14 +206,17 @@ class PerfTool(object): '''Return a tuple containing the list of protocols (tcp/udp) and list of packet sizes (udp only) ''' - # start with TCP (udp=False) then UDP + # start with TCP (protocol="TCP") then UDP proto_list = [] proto_pkt_sizes = [] if 'T' in self.instance.config.protocols: - proto_list.append(False) + proto_list.append('TCP') proto_pkt_sizes.append(self.instance.config.tcp_pkt_sizes) if 'U' in self.instance.config.protocols: - proto_list.append(True) + proto_list.append('UDP') + proto_pkt_sizes.append(self.instance.config.udp_pkt_sizes) + if 'M' in self.instance.config.protocols: + proto_list.append('Multicast') proto_pkt_sizes.append(self.instance.config.udp_pkt_sizes) return (proto_list, proto_pkt_sizes) @@ -282,7 +287,7 @@ class PingTool(PerfTool): mss, reverse_dir=False, bandwidth_kbps=0, - udp=False, + protocol="TCP", length=0, no_cpu_timed=0): # not applicable diff --git a/vmtp/vmtp.py b/vmtp/vmtp.py index ac28ee7..de0bfd6 100755 --- a/vmtp/vmtp.py +++ b/vmtp/vmtp.py @@ -517,7 +517,7 @@ def get_controller_info(ssh_access, net, res_col, retry_count): def gen_report_data(proto, result): try: - if proto in ['TCP', 'UDP', 'ICMP']: + if proto in ['TCP', 'UDP', 'Multicast', 'ICMP']: result = [x for x in result if x['protocol'] == proto] elif proto == 'Upload': result = [x for x in result if ('direction' not in x) and (x['protocol'] == 'TCP')] @@ -528,7 +528,7 @@ def gen_report_data(proto, result): if proto in ['TCP', 'Upload', 'Download']: tcp_test_count = 0 retval = {'tp_kbps': 0, 'rtt_ms': 0} - elif proto == 'UDP': + elif proto == 'UDP' or proto == 'Multicast': pkt_size_list = [x['pkt_size'] for x in result] retval = dict(zip(pkt_size_list, [{}, {}, {}])) @@ -537,9 +537,11 @@ def gen_report_data(proto, result): tcp_test_count = tcp_test_count + 1 retval['tp_kbps'] += item['throughput_kbps'] retval['rtt_ms'] += item['rtt_ms'] - elif proto == 'UDP': + elif proto == 'UDP' or proto == 'Multicast': retval[item['pkt_size']]['tp_kbps'] = item['throughput_kbps'] retval[item['pkt_size']]['loss_rate'] = item['loss_rate'] + if 'jitter' in item: + retval[item['pkt_size']]['jitter'] = item['jitter'] elif proto == 'ICMP': for key in ['rtt_avg_ms', 'rtt_max_ms', 'rtt_min_ms', 'rtt_stddev']: retval[key] = item[key] @@ -560,9 +562,9 @@ def print_report(results): SPASS = "\033[92mPASSED\033[0m" SFAIL = "\033[91mFAILED\033[0m" - # Initilize a run_status[4][2][2][3] array - run_status = [([([(["SKIPPED"] * 3) for i in range(2)]) for i in range(2)]) for i in range(4)] - run_data = [([([([{}] * 3) for i in range(2)]) for i in range(2)]) for i in range(4)] + # Initilize a run_status[4][2][2][4] array + run_status = [([([(["SKIPPED"] * 4) for i in range(2)]) for i in range(2)]) for i in range(4)] + run_data = [([([([{}] * 4) for i in range(2)]) for i in range(2)]) for i in range(4)] flows = results['flows'] for flow in flows: res = flow['results'] @@ -585,7 +587,7 @@ def print_report(results): idx0 = 3 idx1 = idx2 = 0 for item in res: - for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']): + for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']): if (item['protocol'] == proto) and (run_status[idx0][idx1][idx2][idx3] != SFAIL): if 'error' in item: run_status[idx0][idx1][idx2][idx3] = SFAIL @@ -600,15 +602,15 @@ def print_report(results): if net == 'Same Network' and ip == 'Floating IP': continue for idx2, node in enumerate(['Intra-node', 'Inter-node']): - for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']): - row = [str(scenario / 3 + 1) + "." + str(idx3 + 1), + for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']): + row = [str(scenario / 4 + 1) + "." + str(idx3 + 1), "%s, %s, %s, %s" % (net, ip, node, proto), run_status[idx0][idx1][idx2][idx3], run_data[idx0][idx1][idx2][idx3]] table.append(row) scenario = scenario + 1 - for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP']): - row = [str(scenario / 3 + 1) + "." + str(idx3 + 1), + for idx3, proto in enumerate(['TCP', 'UDP', 'ICMP', 'Multicast']): + row = [str(scenario / 4 + 1) + "." + str(idx3 + 1), "Native Throughput, %s" % (proto), run_status[3][0][0][idx3], run_data[3][0][0][idx3]] table.append(row) @@ -756,8 +758,16 @@ def parse_opts_from_cli(): parser.add_argument('--protocols', dest='protocols', action='store', default='TUI', - help='protocols T(TCP), U(UDP), I(ICMP) - default=TUI (all)', - metavar='') + help='protocols T(TCP), U(UDP), I(ICMP), M(Multicast)' + ' - default=TUI (TUIM if --multicast_addr is passed)', + metavar='') + + parser.add_argument('--multicast_addr', dest='multicast_addr', + action='store', + help='bind to multicast address for tests ' + '(implies --protocols M[...], --tp-tool nuttcp )', + metavar='') + parser.add_argument('--bandwidth', dest='vm_bandwidth', action='store', @@ -928,6 +938,7 @@ def merge_opts_to_configs(opts): sys.exit(1) config.vm_bandwidth = int(val * (10 ** (ex_unit * 3))) + # the pkt size for TCP and UDP if opts.tcp_pkt_sizes: try: @@ -987,7 +998,16 @@ def merge_opts_to_configs(opts): # Check the tp-tool name config.protocols = opts.protocols.upper() - if 'T' in config.protocols or 'U' in config.protocols: + if 'M' in config.protocols or opts.multicast_addr: + # nuttcp required for multicast + opts.tp_tool = 'nuttcp' + config.tp_tool = nuttcp_tool.NuttcpTool + # If M provided, but not multicast_addr, use default (231.1.1.1) + config.multicast_addr = opts.multicast_addr if opts.multicast_addr else "231.1.1.1" + # If --multicast_addr provided, ensure 'M' is in protocols. + if 'M' not in config.protocols: + config.protocols += 'M' + elif 'T' in config.protocols or 'U' in config.protocols: if opts.tp_tool.lower() == 'nuttcp': config.tp_tool = nuttcp_tool.NuttcpTool elif opts.tp_tool.lower() == 'iperf':