octavia nsxv lb fixes source_ip pool & fortinet

Change-Id: I820aab0d891252b299dac2b01877706ef7190d8f
This commit is contained in:
dkumbhar 2021-08-12 16:51:12 +00:00
parent a0eab99359
commit 600b4b3322
2 changed files with 2431 additions and 1869 deletions

View File

@ -0,0 +1,562 @@
# Copyright 2017 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import fwaas_client as FWAASC
from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.services import nsxv_client
CONF = config.CONF
class TestFortincatPortCase(feature_manager.FeatureManager):
"""TestFortincatPortCase Scenario
"""
@classmethod
def setup_clients(cls):
super(TestFortincatPortCase, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
cls.cmgr_alt = cls.get_client_manager('alt')
cls.routers_client = cls.cmgr_adm.routers_client
cls.networks_client = cls.cmgr_adm.networks_client
cls.subnets_client = cls.cmgr_adm.subnets_client
cls.sec_rule_client = cls.cmgr_adm.security_group_rules_client
cls.sec_client = cls.cmgr_adm.security_groups_client
cls.fwaasv1_client = FWAASC.get_client(cls.cmgr_adm)
@classmethod
def resource_setup(cls):
super(TestFortincatPortCase, cls).resource_setup()
if CONF.network.backend == "nsxv3" \
or CONF.network.backend == "nsxp":
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
elif CONF.network.backend == "nsxv":
manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}",
CONF.nsxv.manager_uri).group(0)
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
cls.namestart = 'lbaas-ops'
cls.poke_counters = 12
cls.hm_delay = 4
cls.hm_max_retries = 3
cls.hm_timeout = 10
cls.server_names = []
cls.loadbalancer = None
cls.vip_fip = None
cls.web_service_start_delay = 2.5
def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
show_body = self.routers_client.show_router(router_id)
actual_ext_gw_info = show_body['router']['external_gateway_info']
if exp_ext_gw_info is None:
self.assertIsNone(actual_ext_gw_info)
return
# Verify only keys passed in exp_ext_gw_info
for k, v in exp_ext_gw_info.items():
self.assertEqual(v, actual_ext_gw_info[k])
def create_topo_single_network(self, namestart, create_instance=True,
set_gateway=True, cidr=None,
enable_dhcp=True, **kwargs):
"""
Create Topo where 1 logical switches which is
connected via tier-1 router.
"""
rtr_name = data_utils.rand_name(name='tempest-router')
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
router_state = self.create_topology_router(rtr_name,
set_gateway=set_gateway,
**kwargs)
network_state = self.create_topology_network(network_name)
subnet_state = self.create_topology_subnet(
subnet_name, network_state, router_id=router_state["id"],
cidr=cidr)
if create_instance:
image_id = self.get_glance_image_id(["cirros", "esx"])
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network_state],
create_floating_ip=True, image_id=image_id)
topology_dict = dict(router_state=router_state,
network_state=network_state,
subnet_state=subnet_state)
return topology_dict
def verify_ping_to_fip_from_ext_vm(self, server_details):
self.using_floating_ip_check_server_and_project_network_connectivity(
server_details)
def verify_ping_own_fip(self, server, fip_index=0):
fip = server["floating_ips"][fip_index]["floating_ip_address"]
client = self.verify_server_ssh(server, floating_ip=fip)
ping_cmd = "ping -c 1 %s " % fip
self.exec_cmd_on_server_using_fip(ping_cmd, ssh_client=client)
def _conv_switch_prof_to_dict(self, switch_profiles):
switch_prof_dict = {}
for i in range(len(switch_profiles)):
switch_prof_dict.update(
{switch_profiles[i]['key']: switch_profiles[i]['value']})
return switch_prof_dict
def get_fixed_ips_port(self, port=None):
port_id = port.get('port')['id']
ports = self.nsx.get_logical_ports()
fixed_ips = []
for port in ports:
if 'tags' in port:
for tag in port['tags']:
if tag['scope'] == 'policyPath' and \
tag['tag'].split('/')[-1] == port_id and \
port['address_bindings']:
fix_ips = port['address_bindings']
fixed_ips = [ip['ip_address'] for ip in fix_ips]
break
return fixed_ips
@decorators.idempotent_id('1207348d-91cd-8906-b217-98844caa57c1')
def test_create_ipv6_non_dhcp_port_with_two_fixed_ip_subnet_option(self):
rtr_name = data_utils.rand_name(name='tempest-router')
network_ipv4 = data_utils.rand_name(name='tempest-ipv4')
network_ipv6 = data_utils.rand_name(name='tempest-ipv6')
subnet_ipv4 = data_utils.rand_name(name='tempest-ipv4')
subnet_ipv6 = data_utils.rand_name(name='tempest-ipv6')
nw_client = self.cmgr_adm.networks_client
rtr_client = self.cmgr_adm.routers_client
router = self.create_topology_router(rtr_name,
set_gateway=True,
routers_client=rtr_client)
network_ipv4 = self.create_topology_network(network_ipv4,
networks_client=nw_client)
network_ipv6 = self.create_topology_network(network_ipv6,
networks_client=nw_client)
self.create_topology_subnet(subnet_ipv4,
network_ipv4,
router_id=router["id"],
cidr='10.198.1.0/24')
kwargs = {"enable_dhcp": "False"}
ipv6_subnet = self.create_topology_subnet(subnet_ipv6,
network_ipv6,
ip_version=6,
router_id=router["id"],
cidr='2001:db3:a583::/64',
**kwargs)
fix_ips = [{'subnet_id': ipv6_subnet.get('id'),
'ip_address': '2001:db3:a583::11'},
{'subnet_id': ipv6_subnet.get('id'),
'ip_address': '2001:db3:a583::12'}]
args1 = {'fixed_ips': fix_ips, 'network_id': network_ipv6['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertIn('2001:db3:a583::11', fixed_ip_list)
self.assertIn('2001:db3:a583::12', fixed_ip_list)
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertIn('2001:db3:a583::11', fixed_ip_list)
self.assertIn('2001:db3:a583::12', fixed_ip_list)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('1207348d-91cd-8906-b217-98844caa57c1')
def test_create_non_dhcp_port_two_fixed_ip_only_subnet_option(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False,
enable_dhcp=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
fix_ips = [{'subnet_id': subnet_state.get(
'id')},
{'subnet_id': subnet_state.get(
'id')}]
args1 = {'fixed_ips': fix_ips, 'network_id': network_state['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(len(fixed_ip_list) == 2)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(len(fixed_ip_list) == 2)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('1207348d-91cd-8906-b217-99844caa57c3')
def test_create_dhcp_port_with_two_fixed_ip_only_subnet_id_option(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False,
enable_dhcp=True)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
fix_ips = [{'subnet_id': subnet_state.get(
'id')},
{'subnet_id': subnet_state.get(
'id')}]
args1 = {'fixed_ips': fix_ips, 'network_id': network_state['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(len(fixed_ip_list) == 2)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(len(fixed_ip_list) == 2)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('1207348d-91cd-8906-b217-98844caa57c3')
def test_create_non_dhcp_port_two_fixed_ip_only_ipaddress_option(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False,
enable_dhcp=False)
network_state = topology_dict['network_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ips = [{'ip_address': network_cidr[0] + '.11'},
{'ip_address': network_cidr[0] + '.12'}]
args1 = {'fixed_ips': fix_ips, 'network_id': network_state['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(len(fixed_ip_list) == 2)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(len(fixed_ip_list) == 2)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('4207348d-91cd-8906-b217-98844caa57c4')
def test_create_dhcp_port_two_fixed_ip_only_ipaddress_option(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False,
enable_dhcp=True)
network_state = topology_dict['network_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ips = [{'ip_address': network_cidr[0] + '.11'},
{'ip_address': network_cidr[0] + '.12'}]
args1 = {'fixed_ips': fix_ips, 'network_id': network_state['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(len(fixed_ip_list) == 2)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(len(fixed_ip_list) == 2)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('1207349c-91cc-8906-b217-98844caa57c1')
def test_create_non_dhcp_port_one_fixed_ip_subnet_id_and_ip_address(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False,
enable_dhcp=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.11'}]
fix_ips = [{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.11'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.12'}]
args1 = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
args2 = {'fixed_ips': fix_ips}
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
# Update port
port_detail = self.cmgr_adm.ports_client.update_port(port['id'],
**args2)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Delete second fixed ip from port
args1.pop('network_id', None)
port_detail = self.cmgr_adm.ports_client.update_port(port['id'],
**args1)
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
# Verify nsxt backend has only one ip
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertFalse(network_cidr[0] + '.12' in fixed_ip_list)
@decorators.idempotent_id('1207349c-91cc-8906-b217-98844caa57d2')
def test_create_dhcp_port_wit_one_fixed_ip_subnet_id_and_ip_address(self):
topology = self.create_topo_single_network("instance_port",
create_instance=False)
network_state = topology['network_state']
subnet_state = topology['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.11'}]
args1 = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
args1['security_groups'] = [self.sg['id']]
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
image_id = self.get_glance_image_id(['cirros'])
sec_grps = [{'name': self.sg['name']}]
vm1 = self.create_topology_instance("state_vm_1", port=port,
create_floating_ip=True,
image_id=image_id,
security_groups=sec_grps,
clients=self.cmgr_adm)
self.verify_ping_own_fip(vm1)
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
@decorators.idempotent_id('1207349c-92cc-8906-b217-98844caa57d2')
def test_create_dhcp_port_with_two_fixed_ip_subnet_id_and_ip_address(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.11'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.12'}]
args1 = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
args1['security_groups'] = [self.sg['id']]
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
self.addCleanup(self.cmgr_adm.ports_client.delete_port, port['id'])
image_id = self.get_glance_image_id(['cirros'])
sec_grps = [{'name': self.sg['name']}]
vm1 = self.create_topology_instance("state_vm_1", port=port,
create_floating_ip=True,
image_id=image_id,
security_groups=sec_grps,
clients=self.cmgr_adm)
public_ip1 = vm1['floating_ips'][0]['floating_ip_address']
self.verify_ping_own_fip(vm1)
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
ssh_src1 = self._get_remote_client(public_ip1, username='cirros',
use_password=True)
self._assign_ipv4_address(ssh_src1, 'eth0:1', network_cidr[0] + '.12')
self.verify_ping_own_fip(vm1, fip_index=1)
@decorators.idempotent_id('1207349c-93cc-8906-b217-98844caa57d2')
def test_update_dhcp_delete_port_two_fixed_ip_subnt_and_ip_addr(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.11'}]
fix_ips = [{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.11'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.12'}]
args1 = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
args2 = {'fixed_ips': fix_ips}
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
args1['security_groups'] = [self.sg['id']]
port_detail = self.cmgr_adm.ports_client.create_port(**args1)
port = port_detail['port']
image_id = self.get_glance_image_id(['cirros'])
sec_grps = [{'name': self.sg['name']}]
# Update port
port_detail = self.cmgr_adm.ports_client.update_port(port['id'],
**args2)
port = port_detail['port']
vm1 = self.create_topology_instance("state_vm_1", port=port,
create_floating_ip=True,
image_id=image_id,
security_groups=sec_grps,
clients=self.cmgr_adm)
self.verify_ping_own_fip(vm1)
public_ip1 = vm1['floating_ips'][0]['floating_ip_address']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Assign second fixed_ip & verify traffic to floating i[
ssh_src1 = self._get_remote_client(public_ip1, username='cirros',
use_password=True)
self._assign_ipv4_address(ssh_src1, 'eth0:1',
network_cidr[0] + '.12')
self.addCleanup(self.cmgr_adm.ports_client.delete_port,
port['id'])
self.verify_ping_own_fip(vm1, fip_index=1)
# Delete second fixed ip from port
args1.pop('network_id', None)
port_detail = self.cmgr_adm.ports_client.update_port(port['id'],
**args1)
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
# Verify nsxt backend has only one ip
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertFalse(network_cidr[0] + '.12' in fixed_ip_list)
@decorators.idempotent_id('1207349c-91cc-8906-b217-98844caa57d1')
def test_create_dhcp_port_with_three_fixed_ip_subnet_and_ip_addr(self):
"""
Check it should allow to create port with two
fixed ips with subnet_id & ip_address both option .
"""
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.11'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.12'}]
args = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
port_detail = self.cmgr_adm.ports_client.create_port(**args)
port = port_detail['port']
self.assertEqual("ACTIVE", port['status'])
fixed_ip_list = [ip['ip_address'] for ip in port['fixed_ips']]
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Verify fixed ip on backend & neutron
fixed_ip_list = self.get_fixed_ips_port(port_detail)
self.assertTrue(network_cidr[0] + '.11' in fixed_ip_list)
self.assertTrue(network_cidr[0] + '.12' in fixed_ip_list)
# Deploy VM
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
image_id = self.get_glance_image_id(['cirros', "esx"])
sec_grps = [{'name': self.sg['name']}]
vm1 = self.create_topology_instance("state_vm_1", port=port,
create_floating_ip=True,
image_id=image_id,
security_groups=sec_grps,
clients=self.cmgr_adm)
public_ip1 = vm1['floating_ips'][0]['floating_ip_address']
public_ip2 = vm1['floating_ips'][1]['floating_ip_address']
self.verify_ping_own_fip(public_ip1)
self.verify_ping_own_fip(public_ip2)
ssh_src1 = self._get_remote_client(public_ip1, username='cirros',
use_password=True)
self._assign_ipv4_address(ssh_src1, 'eth0:1', network_cidr[0] + '.12')
@decorators.idempotent_id('4548338b-12aa-0126-b217-12454cab45c3')
def test_update_lb_vip_port_with_allowed_address_pair(self):
"""
Check it should advertise static route with no-not
router
"""
topo_dict = self.create_topo_single_network('test_lb_port',
create_instance=False)
subnet_state = topo_dict['subnet_state']
self.loadbalancer = self.load_balancers_admin_client.\
create_load_balancer(
name='test_lb',
vip_subnet_id=subnet_state['id'])['loadbalancer']
self.addCleanup(
self.load_balancers_admin_client.delete_load_balancer,
self.loadbalancer['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
self.loadbalancer['id'])
allowed_address_pairs = [{'ip_address': '10.0.0.10'}]
self.assertRaises(exceptions.BadRequest,
self.cmgr_adm.ports_client.update_port,
self.loadbalancer['vip_port_id'],
allowed_address_pairs=allowed_address_pairs)
def _assign_ipv4_address(self, ssh_src, interface_name, ip_address):
ssh_src.exec_command("sudo /sbin/ip -4 addr add %s/24 dev %s \
" % (ip_address, interface_name))
ssh_src.exec_command("sudo /sbin/ip link set %s up" % (interface_name))
def _assign_ipv6_address(self, ssh_src, interface_name, ip_address):
ssh_src.exec_command("sudo /sbin/ip -6 addr add %s/64 dev %s \
" % (ip_address, interface_name))
ssh_src.exec_command("sudo /sbin/ip link set %s up" % (interface_name))