Merge "Rework floating ip functional tests"

This commit is contained in:
Jenkins 2017-05-11 14:25:34 +00:00 committed by Gerrit Code Review
commit 96a3bae3c1

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import random
import re
import uuid
from openstackclient.tests.functional.network.v2 import common
@ -19,8 +19,8 @@ from openstackclient.tests.functional.network.v2 import common
class FloatingIpTests(common.NetworkTests):
"""Functional tests for floating ip"""
SUBNET_NAME = uuid.uuid4().hex
NETWORK_NAME = uuid.uuid4().hex
EXTERNAL_NETWORK_NAME = uuid.uuid4().hex
EXTERNAL_SUBNET_NAME = uuid.uuid4().hex
PRIVATE_NETWORK_NAME = uuid.uuid4().hex
PRIVATE_SUBNET_NAME = uuid.uuid4().hex
ROUTER = uuid.uuid4().hex
@ -30,26 +30,20 @@ class FloatingIpTests(common.NetworkTests):
def setUpClass(cls):
common.NetworkTests.setUpClass()
if cls.haz_network:
# Set up some regex for matching below
cls.re_id = re.compile("id\s+\|\s+(\S+)")
cls.re_floating_ip = re.compile("floating_ip_address\s+\|\s+(\S+)")
cls.re_fixed_ip = re.compile("fixed_ip_address\s+\|\s+(\S+)")
cls.re_description = re.compile("description\s+\|\s+([^|]+?)\s+\|")
cls.re_network_id = re.compile("floating_network_id\s+\|\s+(\S+)")
cls.re_port_id = re.compile("\s+id\s+\|\s+(\S+)")
cls.re_fp_port_id = re.compile("\s+port_id\s+\|\s+(\S+)")
# Create a network for the floating ip
raw_output = cls.openstack(
'network create --external ' + cls.NETWORK_NAME
)
cls.network_id = re.search(cls.re_id, raw_output).group(1)
json_output = json.loads(cls.openstack(
'network create -f json ' +
'--external ' +
cls.EXTERNAL_NETWORK_NAME
))
cls.external_network_id = json_output["id"]
# Create a private network for the port
raw_output = cls.openstack(
'network create ' + cls.PRIVATE_NETWORK_NAME
)
cls.private_network_id = re.search(cls.re_id, raw_output).group(1)
json_output = json.loads(cls.openstack(
'network create -f json ' +
cls.PRIVATE_NETWORK_NAME
))
cls.private_network_id = json_output["id"]
# Try random subnet range for subnet creating
# Because we can not determine ahead of time what subnets are
@ -57,7 +51,7 @@ class FloatingIpTests(common.NetworkTests):
# try 4 times
for i in range(4):
# Make a random subnet
cls.subnet = ".".join(map(
cls.external_subnet = ".".join(map(
str,
(random.randint(0, 223) for _ in range(3))
)) + ".0/26"
@ -67,19 +61,21 @@ class FloatingIpTests(common.NetworkTests):
)) + ".0/26"
try:
# Create a subnet for the network
raw_output = cls.openstack(
'subnet create ' +
'--network ' + cls.NETWORK_NAME + ' ' +
'--subnet-range ' + cls.subnet + ' ' +
cls.SUBNET_NAME
)
json_output = json.loads(cls.openstack(
'subnet create -f json ' +
'--network ' + cls.EXTERNAL_NETWORK_NAME + ' ' +
'--subnet-range ' + cls.external_subnet + ' ' +
cls.EXTERNAL_SUBNET_NAME
))
cls.external_subnet_id = json_output["id"]
# Create a subnet for the private network
priv_raw_output = cls.openstack(
'subnet create ' +
json_output = json.loads(cls.openstack(
'subnet create -f json ' +
'--network ' + cls.PRIVATE_NETWORK_NAME + ' ' +
'--subnet-range ' + cls.private_subnet + ' ' +
cls.PRIVATE_SUBNET_NAME
)
))
cls.private_subnet_id = json_output["id"]
except Exception:
if (i == 3):
# raise the exception at the last time
@ -89,30 +85,21 @@ class FloatingIpTests(common.NetworkTests):
# break and no longer retry if create sucessfully
break
cls.subnet_id = re.search(cls.re_id, raw_output).group(1)
cls.private_subnet_id = re.search(
cls.re_id, priv_raw_output
).group(1)
@classmethod
def tearDownClass(cls):
if cls.haz_network:
raw_output = cls.openstack(
'subnet delete ' + cls.SUBNET_NAME,
del_output = cls.openstack(
'subnet delete ' +
cls.EXTERNAL_SUBNET_NAME + ' ' +
cls.PRIVATE_SUBNET_NAME
)
cls.assertOutput('', raw_output)
raw_output = cls.openstack(
'subnet delete ' + cls.PRIVATE_SUBNET_NAME,
cls.assertOutput('', del_output)
del_output = cls.openstack(
'network delete ' +
cls.EXTERNAL_NETWORK_NAME + ' ' +
cls.PRIVATE_NETWORK_NAME
)
cls.assertOutput('', raw_output)
raw_output = cls.openstack(
'network delete ' + cls.NETWORK_NAME,
)
cls.assertOutput('', raw_output)
raw_output = cls.openstack(
'network delete ' + cls.PRIVATE_NETWORK_NAME,
)
cls.assertOutput('', raw_output)
cls.assertOutput('', del_output)
def setUp(self):
super(FloatingIpTests, self).setUp()
@ -120,146 +107,197 @@ class FloatingIpTests(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
# Verify setup
self.assertIsNotNone(self.external_network_id)
self.assertIsNotNone(self.private_network_id)
self.assertIsNotNone(self.external_subnet_id)
self.assertIsNotNone(self.private_subnet_id)
def test_floating_ip_delete(self):
"""Test create, delete multiple"""
raw_output = self.openstack(
'floating ip create ' +
json_output = json.loads(self.openstack(
'floating ip create -f json ' +
'--description aaaa ' +
self.NETWORK_NAME
)
re_ip = re.search(self.re_floating_ip, raw_output)
self.assertIsNotNone(re_ip)
ip1 = re_ip.group(1)
self.EXTERNAL_NETWORK_NAME
))
self.assertIsNotNone(json_output["id"])
ip1 = json_output["id"]
self.assertEqual(
'aaaa',
re.search(self.re_description, raw_output).group(1),
json_output["description"],
)
raw_output = self.openstack(
'floating ip create ' +
json_output = json.loads(self.openstack(
'floating ip create -f json ' +
'--description bbbb ' +
self.NETWORK_NAME
)
ip2 = re.search(self.re_floating_ip, raw_output).group(1)
self.EXTERNAL_NETWORK_NAME
))
self.assertIsNotNone(json_output["id"])
ip2 = json_output["id"]
self.assertEqual(
'bbbb',
re.search(self.re_description, raw_output).group(1),
json_output["description"],
)
# Clean up after ourselves
raw_output = self.openstack('floating ip delete ' + ip1 + ' ' + ip2)
self.assertOutput('', raw_output)
del_output = self.openstack('floating ip delete ' + ip1 + ' ' + ip2)
self.assertOutput('', del_output)
self.assertIsNotNone(json_output["floating_network_id"])
def test_floating_ip_list(self):
"""Test create defaults, list filters, delete"""
raw_output = self.openstack(
'floating ip create ' +
json_output = json.loads(self.openstack(
'floating ip create -f json ' +
'--description aaaa ' +
self.NETWORK_NAME
)
re_ip = re.search(self.re_floating_ip, raw_output)
self.assertIsNotNone(re_ip)
ip1 = re_ip.group(1)
self.EXTERNAL_NETWORK_NAME
))
self.assertIsNotNone(json_output["id"])
ip1 = json_output["id"]
self.addCleanup(self.openstack, 'floating ip delete ' + ip1)
self.assertEqual(
'aaaa',
re.search(self.re_description, raw_output).group(1),
json_output["description"],
)
self.assertIsNotNone(re.search(self.re_network_id, raw_output))
self.assertIsNotNone(json_output["floating_network_id"])
fip1 = json_output["floating_ip_address"]
raw_output = self.openstack(
'floating ip create ' +
json_output = json.loads(self.openstack(
'floating ip create -f json ' +
'--description bbbb ' +
self.NETWORK_NAME
)
ip2 = re.search(self.re_floating_ip, raw_output).group(1)
self.EXTERNAL_NETWORK_NAME
))
self.assertIsNotNone(json_output["id"])
ip2 = json_output["id"]
self.addCleanup(self.openstack, 'floating ip delete ' + ip2)
self.assertEqual(
'bbbb',
re.search(self.re_description, raw_output).group(1),
json_output["description"],
)
self.assertIsNotNone(json_output["floating_network_id"])
fip2 = json_output["floating_ip_address"]
# Test list
raw_output = self.openstack('floating ip list')
self.assertIsNotNone(re.search("\|\s+" + ip1 + "\s+\|", raw_output))
self.assertIsNotNone(re.search("\|\s+" + ip2 + "\s+\|", raw_output))
json_output = json.loads(self.openstack(
'floating ip list -f json'
))
fip_map = {
item.get('ID'):
item.get('Floating IP Address') for item in json_output
}
# self.assertEqual(item_map, json_output)
self.assertIn(ip1, fip_map.keys())
self.assertIn(ip2, fip_map.keys())
self.assertIn(fip1, fip_map.values())
self.assertIn(fip2, fip_map.values())
# Test list --long
raw_output = self.openstack('floating ip list --long')
self.assertIsNotNone(re.search("\|\s+" + ip1 + "\s+\|", raw_output))
self.assertIsNotNone(re.search("\|\s+" + ip2 + "\s+\|", raw_output))
json_output = json.loads(self.openstack(
'floating ip list -f json ' +
'--long'
))
fip_map = {
item.get('ID'):
item.get('Floating IP Address') for item in json_output
}
self.assertIn(ip1, fip_map.keys())
self.assertIn(ip2, fip_map.keys())
self.assertIn(fip1, fip_map.values())
self.assertIn(fip2, fip_map.values())
desc_map = {
item.get('ID'): item.get('Description') for item in json_output
}
self.assertIn('aaaa', desc_map.values())
self.assertIn('bbbb', desc_map.values())
# TODO(dtroyer): add more filter tests
def test_floating_ip_show(self):
"""Test show"""
raw_output = self.openstack(
'floating ip create ' +
'--description shosho ' +
# '--fixed-ip-address 1.2.3.4 ' +
self.NETWORK_NAME
)
re_ip = re.search(self.re_floating_ip, raw_output)
self.assertIsNotNone(re_ip)
ip = re_ip.group(1)
raw_output = self.openstack('floating ip show ' + ip)
self.addCleanup(self.openstack, 'floating ip delete ' + ip)
json_output = json.loads(self.openstack(
'floating ip show -f json ' +
ip1
))
self.assertIsNotNone(json_output["id"])
self.assertEqual(
'shosho',
re.search(self.re_description, raw_output).group(1),
ip1,
json_output["id"],
)
self.assertEqual(
'aaaa',
json_output["description"],
)
self.assertIsNotNone(json_output["floating_network_id"])
self.assertEqual(
fip1,
json_output["floating_ip_address"],
)
# TODO(dtroyer): not working???
# self.assertEqual(
# '1.2.3.4',
# re.search(self.re_floating_ip, raw_output).group(1),
# )
self.assertIsNotNone(re.search(self.re_network_id, raw_output))
def test_floating_ip_set_and_unset_port(self):
"""Test Floating IP Set and Unset port"""
raw_output = self.openstack(
'floating ip create ' +
'--description shosho ' +
self.NETWORK_NAME
json_output = json.loads(self.openstack(
'floating ip create -f json ' +
'--description aaaa ' +
self.EXTERNAL_NETWORK_NAME
))
self.assertIsNotNone(json_output["id"])
ip1 = json_output["id"]
self.addCleanup(self.openstack, 'floating ip delete ' + ip1)
self.assertEqual(
'aaaa',
json_output["description"],
)
re_ip = re.search(self.re_floating_ip, raw_output)
fp_ip = re_ip.group(1)
self.addCleanup(self.openstack, 'floating ip delete ' + fp_ip)
self.assertIsNotNone(fp_ip)
raw_output1 = self.openstack(
'port create --network ' + self.PRIVATE_NETWORK_NAME
+ ' --fixed-ip subnet=' + self.PRIVATE_SUBNET_NAME +
' ' + self.PORT_NAME
)
re_port_id = re.search(self.re_port_id, raw_output1)
self.assertIsNotNone(re_port_id)
port_id = re_port_id.group(1)
json_output = json.loads(self.openstack(
'port create -f json ' +
'--network ' + self.PRIVATE_NETWORK_NAME + ' ' +
'--fixed-ip subnet=' + self.PRIVATE_SUBNET_NAME + ' ' +
self.PORT_NAME
))
self.assertIsNotNone(json_output["id"])
port_id = json_output["id"]
router = self.openstack('router create ' + self.ROUTER)
self.assertIsNotNone(router)
json_output = json.loads(self.openstack(
'router create -f json ' +
self.ROUTER
))
self.assertIsNotNone(json_output["id"])
self.addCleanup(self.openstack, 'router delete ' + self.ROUTER)
self.openstack('router add port ' + self.ROUTER +
' ' + port_id)
self.openstack('router set --external-gateway ' + self.NETWORK_NAME +
' ' + self.ROUTER)
self.openstack(
'router add port ' +
self.ROUTER + ' ' +
port_id
)
self.addCleanup(self.openstack, 'router unset --external-gateway '
+ self.ROUTER)
self.addCleanup(self.openstack, 'router remove port ' + self.ROUTER
+ ' ' + port_id)
self.openstack(
'router set ' +
'--external-gateway ' + self.EXTERNAL_NETWORK_NAME + ' ' +
self.ROUTER
)
self.addCleanup(
self.openstack,
'router unset --external-gateway ' + self.ROUTER,
)
self.addCleanup(
self.openstack,
'router remove port ' + self.ROUTER + ' ' + port_id,
)
raw_output = self.openstack(
self.openstack(
'floating ip set ' +
fp_ip + ' --port ' + port_id)
self.addCleanup(self.openstack, 'floating ip unset --port ' + fp_ip)
'--port ' + port_id + ' ' +
ip1
)
self.addCleanup(
self.openstack,
'floating ip unset --port ' + ip1,
)
show_output = self.openstack(
'floating ip show ' + fp_ip)
json_output = json.loads(self.openstack(
'floating ip show -f json ' +
ip1
))
self.assertEqual(
port_id,
re.search(self.re_fp_port_id, show_output).group(1))
json_output["port_id"],
)