Add multicast and broadcast tests

Moved tests from downstream plugin as they are, with necessary
adjustments.
- Changed paths where needed.
- Updated test uuids.
- Moved also relevant config options and base functions.

Change-Id: I509641a72ae87e6aed8582426a09ab87390c72fe
This commit is contained in:
Roman Safronov 2024-02-14 12:54:37 +02:00
parent 7589cc2c37
commit e3ff03e580
5 changed files with 1825 additions and 6 deletions

View File

@ -37,5 +37,37 @@ WhiteboxNeutronPluginOptions = [
cfg.StrOpt('pki_ca_cert',
default='/etc/ipa/ca.crt',
help='File with peer CA certificate. Need for TLS-everywhere '
'environments.')
'environments.'),
cfg.StrOpt('default_instance_interface',
default='eth0',
help='Default first interface name used in VM instances'
'Typical values are eth0, ens5, etc'),
cfg.IntOpt('mcast_groups_count',
default=1,
help='How many groups to use in multicast tests. Default value '
'of 1 is for environments with low resources. '
'Recommended value is 2.'),
cfg.IntOpt('mcast_receivers_count',
default=1,
help='How many receivers to use in multicast tests. Default '
'value of 1 is for environments with low resources. '
'Recommended value is 2.'),
cfg.IntOpt('external_igmp_querier_period',
default=170,
help='Time in seconds external igmp querier is sending its '
'periodical queries'),
cfg.BoolOpt('run_traffic_flow_tests',
default=False,
help='Specify explicitly whether to run traffic flow tests.'
' This is needed because some ML2 plugins (e.g. ovn ) do '
'not expose api_extensions like dvr for some purposes.'),
cfg.IntOpt('broadcast_receivers_count',
default=2,
help='How many receivers to use in broadcast tests. Default '
'and recommended value of 2. In case of environments '
'with low resources, set it to 1.'),
cfg.BoolOpt('bgp',
default=False,
help='Specifies whether the OSP setup under test has been '
'configured with BGP functionality or not')
]

View File

@ -12,19 +12,27 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import random
import re
import time
import netaddr
from neutron_lib import constants
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin.scenario import base
from oslo_log import log
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture
from whitebox_neutron_tempest_plugin.common import utils as local_utils
CONF = config.CONF
LOG = log.getLogger(__name__)
WB_CONF = config.CONF.whitebox_neutron_plugin_options
WB_CONF = CONF.whitebox_neutron_plugin_options
class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
@ -39,6 +47,12 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
cls.image_ref = CONF.compute.image_ref
cls.flavor_ref = CONF.compute.flavor_ref
cls.username = CONF.validation.image_ssh_user
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
ovn_agents = [agent for agent in agents if 'ovn' in agent['binary']]
cls.has_ovn_support = True if ovn_agents else False
sriov_agents = [
agent for agent in agents if 'sriov' in agent['binary']]
cls.has_sriov_support = True if sriov_agents else False
@classmethod
def run_on_master_controller(cls, cmd):
@ -53,15 +67,292 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
server_details = self.os_admin.servers_client.show_server(server_id)
return server_details['server']['OS-EXT-SRV-ATTR:host']
@classmethod
def get_external_gateway(cls):
if CONF.network.public_network_id:
subnets = cls.os_admin.network_client.list_subnets(
network_id=CONF.network.public_network_id)
for subnet in subnets['subnets']:
if (subnet['gateway_ip'] and
subnet['ip_version'] == constants.IP_VERSION_4):
return subnet['gateway_ip']
def _create_server_for_topology(
self, network_id=None, port_type=None,
different_host=None, port_qos_policy_id=None):
if not network_id:
network_id = self.network['id']
if port_type:
kwargs = {'binding:vnic_type': port_type,
'qos_policy_id': port_qos_policy_id}
port = self.create_port(
network={'id': network_id}, **kwargs)
networks = [{'port': port['id']}]
else:
networks = [{'uuid': network_id}]
params = {
'flavor_ref': self.flavor_ref,
'image_ref': self.image_ref,
'key_name': self.keypair['name'],
'networks': networks,
'security_groups': [
{'name': self.secgroup['security_group']['name']}],
'name': data_utils.rand_name(self._testMethodName)
}
if port_type == 'direct-physical':
net_vlan = self.client.show_network(
network_id)['network']['provider:segmentation_id']
params['user_data'] = build_user_data(net_vlan)
params['config_drive'] = True
if (different_host and CONF.compute.min_compute_nodes > 1):
params['scheduler_hints'] = {
'different_host': different_host['id']}
server = self.create_server(**params)['server']
if different_host and CONF.compute.min_compute_nodes > 1:
if (self.get_host_for_server(different_host['id']) ==
self.get_host_for_server(server['id'])):
raise self.skipException(
'Failed to run the VM on a different hypervisor, make '
'sure that DifferentHostFilter is in the list of '
'enabled nova scheduler filters')
port = self.client.list_ports(device_id=server['id'])['ports'][0]
if network_id == CONF.network.public_network_id:
access_ip_address = port['fixed_ips'][0]['ip_address']
else:
access_ip_address = self.create_floatingip(
port=port)['floating_ip_address']
server['ssh_client'] = ssh.Client(access_ip_address,
self.username,
pkey=self.keypair['private_key'])
return server
def _create_vms_by_topology(
self, topology='internal', port_type=None, ipv6=False,
different_host=True, num_vms_created=2):
"""Function for creating desired topology for the test
Available topologies:
* internal(default): sender and receiver are on tenant network
* external: sender and receiver are on external(public) network
* east-west: sender and receiver are on different tenant networks
* north-south: sender is on external and receiver on tenant network
:param topology(str): one of 4 available topologies to use (see list
above)
:param port_type(str): type of port to use. If omitted, default port
type will be used. Can be set to 'direct' or 'direct-physical'
for SR-IOV environments.
:param different_host(bool): whether to force vms to run on different
host.
:param num_vms_created(int): number of vms to create, 1 or 2.
default is 2.
:returns: sender if num_vms_created is 1, else server and receiver
"""
# num_vms_created can be 1 or 2
self.assertIn(num_vms_created, [1, 2], "num_vms_created can be 1 or 2")
def _create_local_network():
network = self.create_network()
subnet_index = len(self.reserved_subnet_cidrs)
cidr = '192.168.%d.0/24' % subnet_index
subnet = self.create_subnet(network, cidr=cidr)
self.create_router_interface(router['id'], subnet['id'])
if ipv6:
ipv6_cidr = '2001:{:x}::/64'.format(200 + subnet_index)
ra_address_mode = 'dhcpv6-stateless'
ipv6_subnet = self.create_subnet(
network, cidr=ipv6_cidr, ip_version=6,
ipv6_ra_mode=ra_address_mode,
ipv6_address_mode=ra_address_mode)
self.create_router_interface(router['id'], ipv6_subnet['id'])
return network
if topology != 'external':
if hasattr(self, "router") and self.router:
router = self.router
else:
router = self.create_router_by_client()
if topology == 'external' or topology == 'north-south':
external_network = self.client.show_network(
CONF.network.public_network_id)['network']
if not external_network['shared']:
skip_reason = "External network is not shared"
self.skipTest(skip_reason)
src_network = external_network
else:
src_network = _create_local_network()
sender = self._create_server_for_topology(
network_id=src_network['id'],
port_type=port_type)
if topology == 'external' or topology == 'internal':
dst_network = src_network
else:
dst_network = _create_local_network()
different_host = sender if different_host else None
if num_vms_created == 1:
return sender
receiver = self._create_server_for_topology(
different_host=different_host, network_id=dst_network['id'],
port_type=port_type)
return sender, receiver
class TrafficFlowTest(BaseTempestWhiteboxTestCase):
force_tenant_isolation = False
@classmethod
@utils.requires_ext(extension="router", service="network")
def skip_checks(cls):
super(TrafficFlowTest, cls).skip_checks()
if not CONF.network.public_network_id:
raise cls.skipException(
'The public_network_id option must be specified.')
if not WB_CONF.run_traffic_flow_tests:
raise cls.skipException(
"CONF.whitebox_neutron_plugin_options."
"run_traffic_flow_tests set to False.")
@classmethod
def resource_setup(cls):
super(TrafficFlowTest, cls).resource_setup()
cls.gateway_external_ip = cls.get_external_gateway()
if not cls.gateway_external_ip:
raise cls.skipException("IPv4 gateway is not configured "
"for public network or public_network_id "
"is not configured.")
def _start_captures(self, interface, filters):
for node in self.nodes:
node['capture'] = capture.TcpdumpCapture(
node['client'], interface, filters)
self.useFixture(node['capture'])
time.sleep(2)
def _stop_captures(self):
for node in self.nodes:
node['capture'].stop()
def check_east_west_icmp_flow(
self, dst_ip, expected_routing_nodes, expected_macs, ssh_client):
"""Check that traffic routed as expected within a tenant network
Both directions are supported.
Traffic is captured on
CONF.whitebox_neutron_plugin_options.node_tunnel_interface.
Use values:
genev_sys_6081 for OVN
vxlanxx for ML2/OVS with VXLAN tunnels
<vlanid> for ML2/OVS with VLAN tunnels
:param dst_ip(str): Destination IP address that we check route to
:param expected_routing_nodes(list): Hostnames of expected gateways,
nodes on tunnel interface of which we expect
to find ethernet frames with packets that we send
:param expected_macs(tuple): pair of MAC addresses of ports that we
expect to find on the captured packets
:param ssh_client(Client): SSH client object of the origin of traffic
(the one that we send traffic from)
"""
interface = CONF.whitebox_neutron_plugin_options.node_tunnel_interface
# create filters
if type(expected_macs) is tuple:
filters = 'icmp and ether host {0} and ether host {1}'.format(
expected_macs[0],
expected_macs[1])
elif type(expected_macs) is list:
filters = ('"icmp and ((ether host {0} and ether host {1}) '
'or (ether host {2} and ether host {3}))"').format(
expected_macs[0][0],
expected_macs[0][1],
expected_macs[1][0],
expected_macs[1][1])
else:
raise TypeError(expected_macs)
self._start_captures(interface, filters)
self.check_remote_connectivity(ssh_client, dst_ip, ping_count=2)
self._stop_captures()
LOG.debug('Expected routing nodes: {}'.format(
','.join(expected_routing_nodes)))
actual_routing_nodes = [node['name']
for node in self.nodes if
not node['capture'].is_empty()]
LOG.debug('Actual routing nodes: {}'.format(
','.join(actual_routing_nodes)))
self.assertCountEqual(expected_routing_nodes, actual_routing_nodes)
def check_north_south_icmp_flow(
self, dst_ip, expected_routing_nodes, expected_mac, ssh_client,
ignore_outbound=False):
"""Check that traffic routed as expected between internal and external
networks. Both directions are supported.
:param dst_ip(str): Destination IP address that we check route to
:param expected_routing_nodes(list): Hostnames of expected gateways,
nodes on external interface of which we expect
to find ethernet frames with packets that we send
:param expected_mac(str): MAC address of a port that we expect to find
on the expected gateway external interface
:param ssh_client(Client): SSH client object of the origin of traffic
(the one that we send traffic from)
:param ignore_outbound(bool): Whether to ignore outbound packets.
This helps to avoid false positives.
"""
interface = WB_CONF.node_ext_interface
inbound = '-Qin' if ignore_outbound else ''
size = None
if not WB_CONF.bgp:
filters = '{} icmp and ether host {}'.format(inbound, expected_mac)
else:
filters = "{} icmp and icmp[0] == 8".format(inbound)
size = random.randint(0, 50)
# Adjust payload size adding icmp header size
if netaddr.valid_ipv6(dst_ip):
size += 44
else:
size += 28
# Filter including ip size packet
filters += " and ip[2:2]=={} and ip dst {}".format(size, dst_ip)
self._start_captures(interface, filters)
# if the host is localhost, don't use remote connectivity,
# ping directly on the host
if ssh_client.host in (
'localhost', '127.0.0.1', '0:0:0:0:0:0:0:1', '::1'):
self.ping_ip_address(dst_ip, mtu=size, should_succeed=True)
# tcpdump requires a delay between capturing packets and writing
# them to its file.
time.sleep(2)
else:
self.check_remote_connectivity(
ssh_client, dst_ip, mtu=size, ping_count=2)
self._stop_captures()
LOG.debug('Expected routing nodes: {}'.format(expected_routing_nodes))
actual_routing_nodes = [node['name']
for node in self.nodes if
not node['capture'].is_empty()]
LOG.debug('Actual routing nodes: {}'.format(
','.join(actual_routing_nodes)))
self.assertCountEqual(expected_routing_nodes, actual_routing_nodes)
class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase):
@classmethod
def resource_setup(cls):
super(BaseTempestTestCaseOvn, cls).resource_setup()
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
ovn_agents = [agent for agent in agents if 'ovn' in agent['binary']]
if not ovn_agents:
if not cls.has_ovn_support:
raise cls.skipException(
"OVN agents not found. This test is supported only on "
"openstack environments with OVN support.")
@ -189,3 +480,34 @@ class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase):
'name=provnet-{sid}'.format(cmd=self.nbctl, sid=segment_id)
output = self.run_on_master_controller(cmd)
self.assertEqual(output, '')
# user_data_cmd is used to generate a VLAN interface on VM instances with PF
# ports
user_data_cmd = """
#cloud-config
write_files:
- path: "/etc/sysconfig/network-scripts/ifcfg-%s"
owner: "root"
permissions: "777"
content: |
DEVICE="%s"
BOOTPROTO="dhcp"
ONBOOT="yes"
VLAN="yes"
PERSISTENT_DHCLIENT="yes"
runcmd:
- [ sh, -c , "systemctl restart NetworkManager" ]
"""
user_data_cmd = user_data_cmd.replace('\t', '')
def build_user_data(net_vlan):
"""user_data is required when direct-physical (PF) ports are used
"""
if_full_name = '%s.%s' % \
(WB_CONF.default_instance_interface,
net_vlan)
user_data = base64.b64encode((
user_data_cmd % (if_full_name, if_full_name)).encode("utf-8"))
return user_data

View File

@ -0,0 +1,349 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import time
import netaddr
from neutron_lib import constants
from neutron_tempest_plugin.common import ip
from neutron_tempest_plugin.common import ssh
from oslo_log import log
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from whitebox_neutron_tempest_plugin.tests.scenario import base
CONF = config.CONF
WB_CONF = CONF.whitebox_neutron_plugin_options
LOG = log.getLogger(__name__)
def get_capture_script(result_file, interface):
return """#!/bin/bash
export LC_ALL=en_US.UTF-8
tcpdump -Qin -i %(interface)s -vvneA -s0 -l icmp &> %(result_file)s &
""" % {'result_file': result_file,
'interface': interface}
# TODO(eolivare): create a parent class for BaseBroadcastTest and
# BaseMulticastTest with common code
class BaseBroadcastTest(object):
vlan_transparent = False
servers = []
# Import configuration options
receivers_count = WB_CONF.broadcast_receivers_count
capture_output_file = "/tmp/capture_broadcast_out"
# the following values can be used to send ping messages including once the
# text "Hi here"
broadcast_message = "Hi here"
broadcast_message_pattern = "486920686572652e"
ping_size = 23
@classmethod
def skip_checks(cls):
super(BaseBroadcastTest, cls).skip_checks()
advanced_image_available = (
CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced)
if not advanced_image_available:
skip_reason = "This test require advanced tools for this test"
raise cls.skipException(skip_reason)
@classmethod
def resource_setup(cls):
super(BaseBroadcastTest, cls).resource_setup()
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = (
CONF.neutron_plugin_options.advanced_image_flavor_ref)
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
# setup basic topology for servers we can log into it
try:
if cls.vlan_transparent:
cls.network = cls.create_network(
vlan_transparent=cls.vlan_transparent)
else:
cls.network = cls.create_network()
except exceptions.ServerFault as exc:
msg = 'Backend does not support VLAN Transparency.'
if exc.resp_body['message'] == msg:
raise cls.skipException(msg)
else:
raise exc
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router_by_client()
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name='secgroup_bcast')
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
def _create_server(self, server_type, vlan_tag=None, scheduler_hints=None):
network_id = self.network['id']
if self.vlan_transparent:
server_index = len(self.servers)
vlan_ipmask = self.vlan_ipmast_template.format(
ip_last_byte=server_index + 10)
allowed_address_pairs = [{'ip_address': vlan_ipmask}]
port = self.create_port(
network={'id': network_id},
security_groups=[self.secgroup['security_group']['id']],
allowed_address_pairs=allowed_address_pairs)
networks = [{'port': port['id']}]
else:
networks = [{'uuid': network_id}]
params = {
'flavor_ref': self.flavor_ref,
'image_ref': self.image_ref,
'key_name': self.keypair['name'],
'networks': networks,
'security_groups': [
{'name': self.secgroup['security_group']['name']}],
'name': data_utils.rand_name(server_type)
}
if not (CONF.compute.min_compute_nodes > 1):
LOG.debug('number of compute nodes is not higher than 1 - '
'scheduler_hints will not be used')
elif scheduler_hints:
params['scheduler_hints'] = scheduler_hints
server = self.create_server(**params)['server']
self.wait_for_server_active(server)
port = self.client.list_ports(device_id=server['id'])['ports'][0]
access_ip_address = self.create_floatingip(
port=port)['floating_ip_address']
server['ssh_client'] = ssh.Client(access_ip_address,
self.username,
pkey=self.keypair['private_key'])
# enable icmp broadcast responses
server['ssh_client'].execute_script(
"sysctl net.ipv4.icmp_echo_ignore_broadcasts=0", become_root=True)
if self.vlan_transparent:
# configure transparent vlan on server
vlan_tag = vlan_tag or self.vlan_tag
server['vlan_device'] = self._configure_vlan_transparent(
port, server['ssh_client'], vlan_ipmask, vlan_tag)
self.servers.append(server)
return server
def _prepare_capture_script(self, server):
interface = server['vlan_device'] if self.vlan_transparent else 'any'
capture_script = get_capture_script(
result_file=self.capture_output_file,
interface=interface)
server['ssh_client'].execute_script(
'echo "%s" > /tmp/capture_script.sh' % capture_script)
def _check_broadcast_conectivity(self, sender, receivers,
nonreceivers=[], num_pings=1):
def _message_received(client, msg, file_path):
result = client.execute_script(
"cat {path} || echo '{path} not exists yet'".format(
path=file_path))
return msg in result
def _validate_number_of_messages(client, file_path, expected_count):
"""This function validates number of packets that reached a VM
The function compares actual received number of packets per group
with the expected number.
"""
result = client.execute_script(
"cat {path} || echo '{path} not exists yet'".format(
path=file_path))
# We need to make sure that exactly the expected count
# of messages reached receiver, no more and no less
LOG.debug('result = {}'. format(result))
count = len(
re.findall(self.broadcast_message, result))
self.assertEqual(
count, expected_count,
'Received number of messages ({}) differs '
'from the expected ({})'.format(
count, num_pings))
# tcpdump started both on receivers and nonreceivers
for server in receivers + nonreceivers:
self._prepare_capture_script(server)
server['ssh_client'].execute_script(
"bash /tmp/capture_script.sh", become_root=True)
if not self.vlan_transparent:
cidr = self.subnet['cidr']
else:
cidr = self.vlan_ipmast_template.format(ip_last_byte=0)
broadcast_ip = netaddr.IPNetwork(cidr).broadcast
# waiting until tcpdump capturing on receivers
time.sleep(2)
sender['ssh_client'].execute_script(
("ping -b %(broadcast_ip)s -s %(ping_size)d "
"-c %(num_pings)d -p %(ping_pattern)s") % {
'broadcast_ip': broadcast_ip,
'ping_size': self.ping_size,
'num_pings': num_pings,
'ping_pattern': self.broadcast_message_pattern})
# waiting until packets reached receivers
time.sleep(2)
# num_ping packets expected on each receiver server
for server in receivers:
server['ssh_client'].execute_script(
"killall tcpdump && sleep 2", become_root=True)
LOG.debug('Validating number of messages on '
'receiver {}'.format(server['id']))
_validate_number_of_messages(
server['ssh_client'], self.capture_output_file, num_pings)
# No packets expected on each nonreceiver server
for server in nonreceivers:
server['ssh_client'].execute_script(
"killall tcpdump && sleep 2", become_root=True)
LOG.debug('Validating number of messages on '
'receiver {}'.format(server['id']))
_validate_number_of_messages(
server['ssh_client'], self.capture_output_file, 0)
class BroadcastTestIPv4(BaseBroadcastTest, base.TrafficFlowTest):
# IP version specific parameters
_ip_version = constants.IP_VERSION_4
any_addresses = constants.IPv4_ANY
class BroadcastTestIPv4Common(BroadcastTestIPv4):
@decorators.idempotent_id('7f33370a-5f46-4452-8b2f-166acda1720f')
def test_broadcast_same_network(self):
"""Test broadcast messaging between servers on the same network
[Sender server] -> (Broadcast network) -> [Receiver server]
Scenario:
1. Create VMs for sender, receiver(s) on a common internal network
and send broadcast ping messages
2. Verify that all broadcast packets reach the other hosts
In case of vlan_transparent, broadcast packets will be captured on VLAN
interfaces
"""
sender = self._create_server('broadcast-sender')
receivers = []
for i in range(self.receivers_count):
# even VMs scheduled on a different compute from the sender
# odd VMs scheduled on the same compute than the sender
if i % 2 == 0:
scheduler_hints = {'different_host': sender['id']}
else:
scheduler_hints = {'same_host': sender['id']}
receivers.append(self._create_server(
'broadcast-receiver', scheduler_hints=scheduler_hints))
self._check_broadcast_conectivity(sender=sender,
receivers=receivers,
num_pings=2)
class BroadcastTestVlanTransparency(BroadcastTestIPv4):
required_extensions = ['vlan-transparent', 'allowed-address-pairs']
vlan_transparent = True
vlan_tag = 123
vlan_ipmast_template = '192.168.111.{ip_last_byte}/24'
def _configure_vlan_transparent(self, port, ssh_client, vlan_ip, vlan_tag):
ip_command = ip.IPCommand(ssh_client=ssh_client)
for address in ip_command.list_addresses(port=port):
port_iface = address.device.name
break
else:
self.fail("Parent port fixed IP not found on server.")
subport_iface = ip_command.configure_vlan_transparent(
port=port, vlan_tag=vlan_tag, ip_addresses=[vlan_ip])
for address in ip_command.list_addresses(ip_addresses=vlan_ip):
self.assertEqual(subport_iface, address.device.name)
self.assertEqual(port_iface, address.device.parent)
break
else:
self.fail("Sub-port fixed IP not found on server.")
return subport_iface
@decorators.idempotent_id('7ea12762-31af-4bf2-9219-c54212171010')
def test_broadcast_vlan_transparency(self):
"""Test broadcast messaging between servers on the same network, but
using different VLANs
[Sender server] -> (Broadcast network) -> [Receiver server]
Scenario:
1. Create VMs for sender, receiver(s) on a common internal network
and send broadcast ping messages
2. Verify that all broadcast packets reach the other hosts within
a common VLAN and that receivers with different VLANs do not
receive those messages
"""
vlan_groups = (self.vlan_tag, self.vlan_tag + 1)
senders = {}
receivers = {}
for vlan_group in vlan_groups:
senders[vlan_group] = self._create_server(
'broadcast-sender-%d' % vlan_group, vlan_tag=vlan_group)
receivers[vlan_group] = []
for i in range(self.receivers_count):
# even VMs scheduled on a different compute from the sender
# odd VMs scheduled on the same compute than the sender
if i % 2 == 0:
scheduler_hints = {
'different_host': senders[vlan_group]['id']}
else:
scheduler_hints = {'same_host': senders[vlan_group]['id']}
receivers[vlan_group].append(
self._create_server('broadcast-receiver-%d' % vlan_group,
vlan_tag=vlan_group,
scheduler_hints=scheduler_hints))
self._check_broadcast_conectivity(
sender=senders[vlan_groups[0]],
receivers=receivers[vlan_groups[0]],
nonreceivers=receivers[vlan_groups[1]], num_pings=2)
self._check_broadcast_conectivity(
sender=senders[vlan_groups[1]],
receivers=receivers[vlan_groups[1]],
nonreceivers=receivers[vlan_groups[0]], num_pings=2)

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@
- x/whitebox-neutron-tempest-plugin
- openstack/tempest
vars:
tempest_concurrency: 4 # out of 4
tempest_concurrency: 2 # out of 4
tox_envlist: all
# NOTE(slaweq): in case of some tests, which requires advanced image,
# default test timeout set to 1200 seconds may be not enough if job is
@ -196,6 +196,9 @@
image_is_advanced: true
available_type_drivers: flat,geneve,vlan,gre,local,vxlan
provider_net_base_segm_id: 1
whitebox_neutron_plugin_options:
run_traffic_flow_tests: True
broadcast_receivers_count: 1
group-vars:
subnode:
devstack_plugins: