Add extra dhcp options tests

Tests moved from downstream plugin with minimal adjustments.
These tests require nmcli inside the server so advanced image
changed to rocky-9 cloud image. Other alternative, e.g. centos-9
does not have a default user and requires additional customization
for setting the user.
Additionally, moved utils.py and constants.py.
From config.py and base.py only relevant subset of code moved.

Change-Id: I6b538c13a15e94d072091b3218b2cd48ff20a70c
This commit is contained in:
Roman Safronov 2023-12-06 12:06:57 +02:00
parent b53e623b63
commit fa2d7457a8
8 changed files with 841 additions and 7 deletions

3
devstack/plugin.sh Normal file
View File

@ -0,0 +1,3 @@
if [[ "$1" == "stack" ]] && [[ "$2" == "install" ]]; then
echo "tempest ALL=(ALL) NOPASSWD: ALL" | sudo tee /etc/sudoers.d/99_tempest
fi

View File

@ -33,7 +33,8 @@ commands = {posargs}
# H904: Delay string interpolations at logging calls # H904: Delay string interpolations at logging calls
enable-extensions = H106,H203,H204,H205,H904 enable-extensions = H106,H203,H204,H205,H904
# H405: multi line docstring summary not separated with an empty line # H405: multi line docstring summary not separated with an empty line
ignore = H405 # W504: line break after binary operator
ignore = H405,W504
show-source = true show-source = true
exclude = ./.*,build,dist,doc,*egg*,releasenotes exclude = ./.*,build,dist,doc,*egg*,releasenotes
import-order-style = pep8 import-order-style = pep8

View File

@ -0,0 +1,42 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
GLOBAL_IP = '1.1.1.1'
NCAT_PORT = 65000
NCAT_TIMEOUT = 30
IP_HEADER_LENGTH = 20
TCP_HEADER_LENGTH = 20
UDP_HEADER_LENGTH = 8
ETHERNET_HEADER_LENGTH = 18
STATE_UP = 'up'
STATE_DOWN = 'down'
ACTION_STOP = 'stop'
ACTION_START = 'start'
DHCP_OPTIONS_NMCLI_TO_NEUTRON = {
'dhcp_lease_time': 'lease-time',
'domain_name': 'domain-name',
'domain_name_servers': 'dns-server',
'interface_mtu': 'mtu',
'dhcp6_domain_search': 'domain-search',
'dhcp6_name_servers': 'dns-server'}
DHCP_OPTIONS_NUMBER_TO_NAME = {
'26': 'mtu'}
DHCP_OPTIONS_NEUTRON_TO_OVN = {
'domain-name': 'domain_name',
'mtu': 'mtu',
'dns-server': 'dns_server',
'lease-time': 'lease_time',
'domain-search': 'domain_search',
'26': 'mtu'}

View File

@ -0,0 +1,214 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import subprocess
import time
from neutron_tempest_plugin.common import shell
from neutron_tempest_plugin.common import utils as common_utils
from oslo_log import log
from tempest import config
from tempest.lib import exceptions
from whitebox_neutron_tempest_plugin.common import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
def create_payload_file(ssh_client, size):
ssh_client.exec_command(
"head -c {0} /dev/zero > {0}".format(size))
def get_temp_file(ssh_client):
output_file = ssh_client.exec_command(
'mktemp').rstrip()
return output_file
def cat_remote_file(ssh_client, path):
return ssh_client.exec_command(
'cat {}'.format(path)).rstrip()
def get_default_interface(ssh_client):
return ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1 | "
"cut -d ' ' -f 5" % constants.GLOBAL_IP).rstrip()
def get_route_interface(ssh_client, dst_ip):
output = ssh_client.exec_command(
"PATH=$PATH:/usr/sbin ip route get default %s | head -1" % dst_ip)
if output:
for line in output.splitlines():
fields = line.strip().split()
device_index = fields.index('dev') + 1
return fields[device_index]
else:
return None
def make_sure_local_port_is_open(protocol, port):
shell.execute_local_command(
"sudo iptables-save | "
r"grep 'INPUT.*{protocol}.*\-\-dport {port} \-j ACCEPT' "
"&& true || "
"sudo iptables -I INPUT 1 -p {protocol} --dport {port} -j ACCEPT"
"".format(protocol=protocol, port=port))
# Unlike ncat server function from the upstream plugin this ncat server
# turns itself off automatically after timeout
def run_ncat_server(ssh_client, udp):
output_file = get_temp_file(ssh_client)
cmd = "sudo timeout {0} nc -l {1} -p {2} > {3}".format(
constants.NCAT_TIMEOUT, udp, constants.NCAT_PORT, output_file)
LOG.debug("Starting nc server: '%s'", cmd)
ssh_client.open_session().exec_command(cmd)
return output_file
# Unlike ncat client function from the upstream plugin this ncat client
# is able to run from any host, not only locally
def run_ncat_client(ssh_client, host, udp, payload_size):
cmd = "nc -w 1 {0} {1} {2} < {3}".format(
host, udp, constants.NCAT_PORT, payload_size)
LOG.debug("Starting nc client: '%s'", cmd)
ssh_client.exec_command(cmd)
def flush_routing_cache(ssh_client):
ssh_client.exec_command("sudo ip route flush cache")
def kill_iperf_process(ssh_client):
cmd = "PATH=$PATH:/usr/sbin pkill iperf3"
try:
ssh_client.exec_command(cmd)
except exceptions.SSHExecCommandFailed:
pass
def configure_interface_up(client, port, interface=None, path=None):
"""configures down interface with ip and activates it
Parameters:
client (ssh.Client):ssh client which has interface to configure.
port (port):port object of interface.
interface (str):optional interface name on vm.
path (str):optional shell PATH variable.
"""
shell_path = path or "PATH=$PATH:/sbin"
test_interface = interface or client.exec_command(
"{};ip addr | grep {} -B 1 | head -1 | "
r"cut -d ':' -f 2 | sed 's/\ //g'".format(
shell_path, port['mac_address'])).rstrip()
if CONF.neutron_plugin_options.default_image_is_advanced:
cmd = ("ip addr show {interface} | grep {ip} || "
"sudo dhclient {interface}").format(
ip=port['fixed_ips'][0]['ip_address'],
interface=test_interface)
else:
cmd = ("cat /sys/class/net/{interface}/operstate | "
"grep -q -v down && true || "
"({path}; sudo ip link set {interface} up && "
"sudo ip addr add {ip}/24 dev {interface})").format(
path=shell_path,
ip=port['fixed_ips'][0]['ip_address'],
interface=test_interface)
common_utils.wait_until_true(
lambda: execute_command_safely(client, cmd), timeout=30, sleep=5)
def parse_dhcp_options_from_nmcli(
ssh_client, ip_version,
timeout=20.0, interval=5.0, expected_empty=False, vlan=None):
# first of all, test ssh connection is available - the time it takes until
# ssh connection can be established is not cosidered for the nmcli timeout
ssh_client.test_connection_auth()
# Add grep -v to exclude loopback interface because
# Managing the lookback interface using NetworkManager is included in
# RHEL9.2 image. Previous version is not included.
cmd_find_connection = 'nmcli -g NAME con show --active | grep -v "^lo"'
if vlan is not None:
cmd_find_connection += ' | grep {}'.format(vlan)
cmd_show_dhcp = ('sudo nmcli -f DHCP{} con show '
'"$({})"').format(ip_version, cmd_find_connection)
start_time = time.time()
while True:
try:
output = ssh_client.exec_command(cmd_show_dhcp)
except exceptions.SSHExecCommandFailed:
LOG.warning('Failed to run nmcli on VM - retrying...')
else:
if not output and not expected_empty:
LOG.warning('nmcli result on VM is empty - retrying...')
else:
break
if time.time() - start_time > timeout:
message = ('Failed to run nmcli on VM after {} '
'seconds'.format(timeout))
raise exceptions.TimeoutException(message)
time.sleep(interval)
if not output:
return None
obtained_dhcp_opts = {}
for line in output.splitlines():
newline = re.sub(r'^DHCP{}.OPTION\[[0-9]+\]:\s+'.format(ip_version),
'', line.strip())
option = newline.split('=')[0].strip()
value = newline.split('=')[1].strip()
if option in constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON:
option = constants.DHCP_OPTIONS_NMCLI_TO_NEUTRON[option]
obtained_dhcp_opts[option] = value
return obtained_dhcp_opts
def execute_command_safely(ssh_client, command):
try:
output = ssh_client.exec_command(command)
except exceptions.SSHExecCommandFailed as err:
LOG.warning('command failed: %s', command)
LOG.exception(err)
return False
LOG.debug('command executed successfully: %s\n'
'command output:\n%s',
command, output)
return True
def host_responds_to_ping(ip, count=3):
cmd = "ping -c{} {}".format(count, ip)
try:
subprocess.check_output(['bash', '-c', cmd])
except subprocess.CalledProcessError:
return False
return True
def run_local_cmd(cmd, timeout=10):
command = "timeout " + str(timeout) + " " + cmd
LOG.debug("Running local command '{}'".format(command))
output, errors = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
return output, errors

View File

@ -21,4 +21,21 @@ whitebox_neutron_plugin_options = cfg.OptGroup(
title="Whitebox neutron tempest plugin config options" title="Whitebox neutron tempest plugin config options"
) )
WhiteboxNeutronPluginOptions = [] WhiteboxNeutronPluginOptions = [
cfg.StrOpt('openstack_type',
default='devstack',
help='Type of openstack deployment, '
'e.g. devstack, tripeo, podified'),
cfg.StrOpt('pki_private_key',
default='/etc/pki/tls/private/ovn_controller.key',
help='File with private key. Need for TLS-everywhere '
'environments.'),
cfg.StrOpt('pki_certificate',
default='/etc/pki/tls/certs/ovn_controller.crt',
help='File with certificate for private key. Need for '
'TLS-everywhere environments.'),
cfg.StrOpt('pki_ca_cert',
default='/etc/ipa/ca.crt',
help='File with peer CA certificate. Need for TLS-everywhere '
'environments.')
]

View File

@ -0,0 +1,187 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import netaddr
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.common import utils as common_utils
from oslo_log import log
from tempest import config
from whitebox_neutron_tempest_plugin.common import utils as local_utils
CONF = config.CONF
LOG = log.getLogger(__name__)
WB_CONF = config.CONF.whitebox_neutron_plugin_options
class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
@classmethod
def resource_setup(cls):
super(BaseTempestWhiteboxTestCase, cls).resource_setup()
uri = CONF.identity.uri
cls.is_ipv6 = True if netaddr.valid_ipv6(
uri[uri.find("[") + 1:uri.find("]")]) else False
cls.image_ref = CONF.compute.image_ref
cls.flavor_ref = CONF.compute.flavor_ref
cls.username = CONF.validation.image_ssh_user
@classmethod
def run_on_master_controller(cls, cmd):
if WB_CONF.openstack_type == 'devstack':
output, errors = local_utils.run_local_cmd(cmd)
LOG.debug("Stderr: {}".format(errors.decode()))
output = output.decode()
LOG.debug("Output: {}".format(output))
return output.strip()
class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase):
@classmethod
def resource_setup(cls):
super(BaseTempestTestCaseOvn, cls).resource_setup()
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
ovn_agents = [agent for agent in agents if 'ovn' in agent['binary']]
if not ovn_agents:
raise cls.skipException(
"OVN agents not found. This test is supported only on "
"openstack environments with OVN support.")
cls.nbctl, cls.sbctl = cls._get_ovn_dbs()
cls.nbmonitorcmd, cls.sbmonitorcmd = cls._get_ovn_db_monitor_cmds()
@classmethod
def _get_ovn_db_monitor_cmds(cls):
regex = r'--db=(.*)$'
# this regex search will return the connection string (tcp:IP:port or
# ssl:IP:port) and in case of TLS, will also include the TLS options
nb_monitor_connection_opts = re.search(regex, cls.nbctl).group(1)
sb_monitor_connection_opts = re.search(regex, cls.sbctl).group(1)
monitorcmdprefix = 'sudo timeout 300 ovsdb-client monitor -f json '
return (monitorcmdprefix + nb_monitor_connection_opts,
monitorcmdprefix + sb_monitor_connection_opts)
@classmethod
def _get_ovn_dbs(cls):
ssl_params = ''
if WB_CONF.openstack_type == 'tripleo':
cmd = ("sudo ovs-vsctl get open . external_ids:ovn-remote | "
"sed -e 's/\"//g'")
sbdb = cls.run_on_master_controller(cmd)
if 'ssl' in sbdb:
ssl_params = '-p {} -c {} -C {} '.format(
WB_CONF.pki_private_key,
WB_CONF.pki_certificate,
WB_CONF.pki_ca_cert)
nbdb = sbdb.replace('6642', '6641')
cmd = 'ovn-{}ctl --db={} %s' % (ssl_params)
cmd = 'sudo %s exec ovn_controller %s' % (cls.container_app, cmd)
if WB_CONF.openstack_type == 'devstack':
sbdb = "unix:/usr/local/var/run/ovn/ovnsb_db.sock"
nbdb = sbdb.replace('sb', 'nb')
cmd = ("sudo ovn-{}ctl --db={}")
return [cmd.format('nb', nbdb), cmd.format('sb', sbdb)]
def get_router_gateway_chassis(self, router_port_id):
cmd = "{} get port_binding cr-lrp-{} chassis".format(
self.sbctl, router_port_id)
LOG.debug("Waiting until port is bound to chassis")
self.chassis_id = None
def _port_binding_exist():
self.chassis_id = self.run_on_master_controller(cmd)
LOG.debug("chassis_id = '{}'".format(self.chassis_id))
if self.chassis_id != '[]':
return True
return False
try:
common_utils.wait_until_true(lambda: _port_binding_exist(),
timeout=30, sleep=5)
except common_utils.WaitTimeout:
self.fail("Port is not bound to chassis")
cmd = "{} get chassis {} hostname".format(self.sbctl, self.chassis_id)
LOG.debug("Running '{}' on the master node".format(cmd))
res = self.run_on_master_controller(cmd)
return res.replace('"', '').split('.')[0]
def get_router_gateway_chassis_list(self, router_port_id):
cmd = (self.nbctl + " lrp-get-gateway-chassis lrp-" + router_port_id)
data = self.run_on_master_controller(cmd)
return [re.sub(r'.*_(.*?)\s.*', r'\1', s) for s in data.splitlines()]
def get_router_gateway_chassis_by_id(self, chassis_id):
res = self.run_on_master_controller(
self.sbctl + " get chassis " + chassis_id + " hostname").rstrip()
return res.replace('"', '').split('.')[0]
def get_router_port_gateway_mtu(self, router_port_id):
cmd = (self.nbctl + " get logical_router_port lrp-" + router_port_id +
" options:gateway_mtu")
return int(
self.run_on_master_controller(cmd).rstrip().strip('"'))
def get_item_uuid(self, db, item, search_string):
ovn_db = self.sbctl if db == 'sb' else self.nbctl
cmd = (ovn_db + " find " + item + " " + search_string +
" | grep _uuid | awk '{print $3}'")
return self.run_on_master_controller(cmd)
def get_datapath_tunnel_key(self, search_string):
cmd = (self.sbctl + " find datapath_binding " + search_string +
" | grep tunnel_key | awk '{print $3}'")
return self.run_on_master_controller(cmd)
def get_logical_switch(self, port):
"""Returns logical switch name that port is connected to
Fuction gets the logical switch name without its ID from the
`ovn-nbctl lsp-get-ls <PORT_NAME>` command
"""
cmd = '{cmd} lsp-get-ls {port}'.format(cmd=self.nbctl, port=port)
output = self.run_on_master_controller(cmd)
ls_name = re.search('neutron-[^)]*', output)
if ls_name:
return ls_name.group()
else:
return ''
def get_physical_net(self, port):
"""Returns physical network name that port has configured with
Physical network name is saved as option in the logical switch port
record in OVN north database. It can be queried with
`ovn-nbctl lsp-get-options <PORT_NAME>` command but this output may
contain more than one option so it is better to get the value with
`ovn-nbctl get Logical_Switch_Port <PORT_NAME> options:network_name`
command
"""
cmd = '{cmd} get Logical_Switch_Port {port} '\
'options:network_name'.format(cmd=self.nbctl, port=port)
return self.run_on_master_controller(cmd)
def verify_that_segment_deleted(self, segment_id):
"""Checks that the segment id is not in the OVN database
There shouldn't be 'provnet-<SEGEMTN_ID>' port in the OVN database
after the segment has been deleted
"""
cmd = '{cmd} find Logical_Switch_Port '\
'name=provnet-{sid}'.format(cmd=self.nbctl, sid=segment_id)
output = self.run_on_master_controller(cmd)
self.assertEqual(output, '')

View File

@ -0,0 +1,369 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from neutron_lib import constants as lib_constants
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants as neutron_constants
from oslo_log import log
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from whitebox_neutron_tempest_plugin.common import constants
from whitebox_neutron_tempest_plugin.common import utils
from whitebox_neutron_tempest_plugin.tests.scenario import base as ds_base
CONF = config.CONF
LOG = log.getLogger(__name__)
IPV4 = lib_constants.IP_VERSION_4
IPV6 = lib_constants.IP_VERSION_6
class ExtraDhcpOptionsTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
required_extensions = ['extra_dhcp_opt']
ipv4_cidr_pattern = '192.168.{}.0/24'
ipv6_cidr_pattern = '2001:{:x}::/64'
@classmethod
def resource_setup(cls):
super(ExtraDhcpOptionsTest, cls).resource_setup()
cls.rand_name = data_utils.rand_name(
cls.__name__.rsplit('.', 1)[-1])
cls.keypair = cls.create_keypair(name=cls.rand_name)
cls.security_group = cls.create_security_group(name=cls.rand_name)
cls.create_loginable_secgroup_rule(
cls.security_group['id'], client=cls.client)
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = \
CONF.neutron_plugin_options.advanced_image_flavor_ref
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
if (not cls.flavor_ref) or (not cls.image_ref):
raise cls.skipException(
'No advance image/flavor available for these tests')
def setUp(self):
super(ExtraDhcpOptionsTest, self).setUp()
self.subnet = None
self.port = None
self.server = None
self.dhcp_disabled = {}
def _check_created_extra_dhcp_opts(self):
# obtain the extra_dhcp_opts from Neutron API
retrieved = self.client.show_port(
self.port['id'])['port']['extra_dhcp_opts']
self.assertEqual(len(retrieved), len(self.extra_dhcp_opts))
for retrieved_option in retrieved:
for option in self.extra_dhcp_opts:
# default ip_version is 4
ip_version = option.get('ip_version', IPV4)
if (retrieved_option['opt_value'] == option['opt_value'] and
retrieved_option['opt_name'] == option['opt_name'] and
retrieved_option['ip_version'] == ip_version):
break
else:
self.fail('Extra DHCP option not found in port %s' %
str(retrieved_option))
def _create_port_and_check_dhcp_opts(
self,
ipv6=False,
ra_address_mode=None,
dhcp4_enabled=True,
dhcp6_enabled=True):
rand_name_test = self.rand_name + '-' + self._testMethodName
network = self.create_network(name=rand_name_test)
subnet_index = len(self.reserved_subnet_cidrs)
self.subnet = self.create_subnet(
network=network,
cidr=self.ipv4_cidr_pattern.format(subnet_index),
name=rand_name_test,
enable_dhcp=dhcp4_enabled)
router = self.create_router_by_client()
self.create_router_interface(router['id'], self.subnet['id'])
if ipv6:
if dhcp6_enabled:
subnetv6 = self.create_subnet(
network=network,
ip_version=IPV6,
name=rand_name_test + '-ipv6',
cidr=self.ipv6_cidr_pattern.format(subnet_index),
ipv6_ra_mode=ra_address_mode,
ipv6_address_mode=ra_address_mode)
else:
# when dhcp6 is disabled, neither ipv6_ra_mode nor
# ipv6_address_mode should be provided
subnetv6 = self.create_subnet(
network=network,
ip_version=IPV6,
name=rand_name_test + '-ipv6',
cidr=self.ipv6_cidr_pattern.format(subnet_index),
enable_dhcp=dhcp6_enabled)
self.create_router_interface(router['id'], subnetv6['id'])
# a port is created with the extra_dhcp_options from the test case
self.port = self.create_port(
network=network, name=rand_name_test,
security_groups=[self.security_group['id']],
extra_dhcp_opts=self.extra_dhcp_opts)
self._check_created_extra_dhcp_opts()
def _create_server_and_fip(self):
networks = [{'port': self.port['id']}]
# config_drive is needed when dhcp4 is disabled in order to provide
# an IPv4 to the server to be able to connect via ssh to it
config_drive = not self.subnet['enable_dhcp']
self.server = self.create_server(
flavor_ref=self.flavor_ref,
image_ref=self.image_ref,
key_name=self.keypair['name'],
networks=networks,
config_drive=config_drive,
name=self.rand_name + '-' + self._testMethodName)['server']
vm_fip = self.create_floatingip(port=self.port)['floating_ip_address']
return ssh.Client(vm_fip,
self.username,
pkey=self.keypair['private_key'])
def _test_extra_dhcp_opts_ipv4_ipv6(self, ra_address_mode):
self._create_port_and_check_dhcp_opts(
ipv6=True, ra_address_mode=ra_address_mode)
vm_ssh_client = self._create_server_and_fip()
nmcli_dhcp_options = {}
nmcli_dhcp_options[IPV4] = utils.parse_dhcp_options_from_nmcli(
vm_ssh_client, IPV4)
nmcli_dhcp_options[IPV6] = utils.parse_dhcp_options_from_nmcli(
vm_ssh_client, IPV6, timeout=100)
for extra_dhcp_opt in self.extra_dhcp_opts:
ip_version = extra_dhcp_opt.get('ip_version', IPV4)
# if opt_name is an integer value, then we need to obtain its
# corresponding option name
try:
int(extra_dhcp_opt['opt_name'])
except ValueError:
# it is not an integer
opt_name = extra_dhcp_opt['opt_name']
else:
opt_name = constants.DHCP_OPTIONS_NUMBER_TO_NAME[
extra_dhcp_opt['opt_name']]
self.assertIn(opt_name, nmcli_dhcp_options[ip_version])
# some formatting is needed before the value comparison:
# - remove extra " from Neutron option
# - remove extra \ from nmcli option
self.assertEqual(
extra_dhcp_opt['opt_value'].replace('"', ''),
nmcli_dhcp_options[ip_version][opt_name].replace('\\', ''))
@decorators.idempotent_id('8f52b4dc-faae-4f1d-b113-d2f3e86bf0d6')
def test_extra_dhcp_opts_ipv4_ipv6_stateful(self):
self.extra_dhcp_opts = [
{'opt_value': '2001:4860:4860::8888',
'opt_name': 'dns-server',
'ip_version': IPV6},
{'opt_value': '8.8.8.8',
'opt_name': 'dns-server',
'ip_version': IPV4},
{'opt_value': '1600',
'opt_name': '26'}] # 26 is option code for mtu
self._test_extra_dhcp_opts_ipv4_ipv6('dhcpv6-stateful')
@decorators.idempotent_id('e9e32249-6148-4565-b7b1-e64c77c9f4ec')
def test_extra_dhcp_opts_ipv4_ipv6_stateless(self):
self.extra_dhcp_opts = [
{'opt_value': '"ipv6.domain"', # domains should be between "
'opt_name': 'domain-search',
'ip_version': IPV6},
{'opt_value': '"ipv4.domain"', # ditto
'opt_name': 'domain-name',
'ip_version': IPV4}]
self._test_extra_dhcp_opts_ipv4_ipv6('dhcpv6-stateless')
@decorators.idempotent_id('ef41d6d8-f2bf-44e4-9f4d-bb8a3fed50ad')
def test_extra_dhcp_opts_disabled_enabled_dhcp4(self):
domain_value = 'ipv4.domain'
domain_opt = 'domain-name'
self.extra_dhcp_opts = [
{'opt_value': '"{}"'.format(domain_value),
'opt_name': domain_opt,
'ip_version': IPV4}]
# dhcp is disabled from the iPv4 subnet created
self._create_port_and_check_dhcp_opts(dhcp4_enabled=False)
vm_ssh_client = self._create_server_and_fip()
# ipv4.domain is not expected
vm_resolv_conf = vm_ssh_client.exec_command(
"sudo dhclient && cat /etc/resolv.conf")
self.assertIsNone(re.search(r'^search\s+{}$'.format(domain_value),
vm_resolv_conf,
re.MULTILINE))
# enable dhcp for the IPv4 subnet and reboot the VM
self.subnets[-1] = self.client.update_subnet(
self.subnets[-1]['id'], enable_dhcp=True)['subnet']
self.os_primary.servers_client.reboot_server(
self.server['id'], type='SOFT')
waiters.wait_for_server_status(self.os_primary.servers_client,
self.server['id'],
neutron_constants.SERVER_STATUS_ACTIVE)
# ipv4.domain is expected
vm_resolv_conf = vm_ssh_client.exec_command(
"sudo dhclient && cat /etc/resolv.conf")
self.assertIsNotNone(re.search(r'^search\s+{}$'.format(domain_value),
vm_resolv_conf,
re.MULTILINE))
@decorators.idempotent_id('abb12899-690a-407d-99d4-49eca030ce94')
def test_extra_dhcp_opts_disabled_dhcp6(self):
domain_value = 'ipv6.domain'
domain_opt = 'domain-search'
self.extra_dhcp_opts = [
{'opt_value': '"{}"'.format(domain_value),
'opt_name': domain_opt,
'ip_version': IPV6}]
# dhcp is disabled from the iPv6 subnet created
self._create_port_and_check_dhcp_opts(ipv6=True, dhcp6_enabled=False)
vm_ssh_client = self._create_server_and_fip()
# empty nmcli_dhcp6_options is expected
nmcli_dhcp6_options = utils.parse_dhcp_options_from_nmcli(
vm_ssh_client, IPV6, timeout=100, expected_empty=True)
self.assertIsNone(nmcli_dhcp6_options)
class OvnExtraDhcpOptionsTest(ExtraDhcpOptionsTest,
ds_base.BaseTempestTestCaseOvn):
def _check_created_extra_dhcp_opts(self):
def _check_options_from_ovn_nbdb(dhcp_option_uuids):
cmd2_pattern = '{} get dhcp_options {} options:{}'
# are dhcpv4 and dhcpv6 enabled?
subnets_dhcp_enabled = {subnet['ip_version']: subnet['enable_dhcp']
for subnet in self.subnets}
for extra_dhcp_opt in self.extra_dhcp_opts:
ip_version = extra_dhcp_opt.get('ip_version', IPV4)
if (not subnets_dhcp_enabled[ip_version] or
self.dhcp_disabled.get(ip_version) is True or
extra_dhcp_opt['opt_name'] == 'dhcp_disabled'):
continue
ovn_opt_name = constants.DHCP_OPTIONS_NEUTRON_TO_OVN[
extra_dhcp_opt['opt_name']]
# if the option is not found in the OVN DB, the command will
# fail and then the test will fail, which is correct
ovn_opt_value = self.run_on_master_controller(
cmd2_pattern.format(
self.nbctl,
dhcp_option_uuids[ip_version],
ovn_opt_name))
# some formatting is needed before the value comparison:
# - remove extra " from Neutron option
# - remove extra \ and " from OVN option
self.assertEqual(
extra_dhcp_opt['opt_value'].replace('"', ''),
ovn_opt_value.replace('"', '').replace('\\', '').strip())
# Verify no dhcpvX_option_uuid exist when DHCPvX is disabled
# Value '[]' means no entry in OVN NBDB dhcp_options table
for ip_version in subnets_dhcp_enabled:
if (subnets_dhcp_enabled[ip_version] is False or
self.dhcp_disabled.get(ip_version) is True):
self.assertEqual('[]', dhcp_option_uuids[ip_version])
# first, call parent's method
super(OvnExtraDhcpOptionsTest, self)._check_created_extra_dhcp_opts()
cmd1_pattern = '{} find logical_switch_port name={}'
output = self.run_on_master_controller(
cmd1_pattern.format(self.nbctl, self.port['id']))
dhcp_option_uuids = {}
for line in output.splitlines():
if re.search(r'^dhcpv4_options', line):
ip_version = IPV4
elif re.search(r'^dhcpv6_options', line):
ip_version = IPV6
else:
# skip this line
continue
dhcp_option_uuids[ip_version] = re.sub(
r'^dhcpv{}_options\s+: '.format(ip_version),
'',
line).strip()
_check_options_from_ovn_nbdb(dhcp_option_uuids)
@decorators.idempotent_id('30ef3221-e46b-4358-b550-d98c08272e1c')
def test_extra_dhcp_opts_dhcp_disabled_option(self):
self.extra_dhcp_opts = [
{'opt_value': '2001:4860:4860::8888',
'opt_name': 'dns-server',
'ip_version': IPV6},
{'opt_value': '8.8.8.8',
'opt_name': 'dns-server',
'ip_version': IPV4},
{'opt_value': 'True',
'opt_name': 'dhcp_disabled',
'ip_version': IPV6},
{'opt_value': 'False',
'opt_name': 'dhcp_disabled',
'ip_version': IPV4}]
self.dhcp_disabled[IPV6] = True
self.dhcp_disabled[IPV4] = False
# create port and check extra-dhcp-opts
self._create_port_and_check_dhcp_opts(
ipv6=True, ra_address_mode='dhcpv6-stateless')
# update port and check extra-dhcp-opts
self.extra_dhcp_opts = [
{'opt_value': '2001:4860:4860::8888',
'opt_name': 'dns-server',
'ip_version': IPV6},
{'opt_value': '8.8.8.8',
'opt_name': 'dns-server',
'ip_version': IPV4},
{'opt_value': 'False',
'opt_name': 'dhcp_disabled',
'ip_version': IPV6},
{'opt_value': 'True',
'opt_name': 'dhcp_disabled',
'ip_version': IPV4}]
self.dhcp_disabled[IPV6] = False
self.dhcp_disabled[IPV4] = True
self.port = self.client.update_port(
self.port['id'], extra_dhcp_opts=self.extra_dhcp_opts)['port']
self._check_created_extra_dhcp_opts()

View File

@ -24,14 +24,15 @@
USE_PYTHON3: true USE_PYTHON3: true
NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_tempest) | join(',') }}" NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_tempest) | join(',') }}"
PHYSICAL_NETWORK: public PHYSICAL_NETWORK: public
IMAGE_URLS: https://cloud-images.ubuntu.com/minimal/releases/focal/release/ubuntu-20.04-minimal-cloudimg-amd64.img IMAGE_URLS: https://download.rockylinux.org/pub/rocky/9/images/x86_64/Rocky-9-GenericCloud.latest.x86_64.qcow2
CIRROS_VERSION: 0.6.2 CIRROS_VERSION: 0.6.2
DEFAULT_IMAGE_NAME: cirros-0.6.2-x86_64-uec DEFAULT_IMAGE_NAME: cirros-0.6.2-x86_64-uec
DEFAULT_IMAGE_FILE_NAME: cirros-0.6.2-x86_64-uec.tar.gz DEFAULT_IMAGE_FILE_NAME: cirros-0.6.2-x86_64-uec.tar.gz
ADVANCED_IMAGE_NAME: ubuntu-20.04-minimal-cloudimg-amd64 ADVANCED_IMAGE_NAME: Rocky-9-GenericCloud.latest.x86_64
ADVANCED_INSTANCE_TYPE: ntp_image_256M ADVANCED_INSTANCE_TYPE: ds1G
ADVANCED_INSTANCE_USER: ubuntu ADVANCED_INSTANCE_USER: rocky
CUSTOMIZE_IMAGE: true GLANCE_LIMIT_IMAGE_SIZE_TOTAL: 2000
CUSTOMIZE_IMAGE: false
BUILD_TIMEOUT: 784 BUILD_TIMEOUT: 784
# TODO(lucasagomes): Re-enable MOD_WSGI after # TODO(lucasagomes): Re-enable MOD_WSGI after
# https://bugs.launchpad.net/neutron/+bug/1912359 is implemented # https://bugs.launchpad.net/neutron/+bug/1912359 is implemented