Merge "improve test_db_plugin so it can be leveraged by extension tests"

This commit is contained in:
Jenkins 2012-08-14 00:22:49 +00:00 committed by Gerrit Code Review
commit 4d9449a3da

View File

@ -33,6 +33,7 @@ from quantum import context
from quantum.db import api as db
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.tests.unit import test_extensions
from quantum.tests.unit.testlib_api import create_request
from quantum.wsgi import Serializer, JSONDeserializer
@ -49,7 +50,8 @@ def etcdir(*p):
class QuantumDbPluginV2TestCase(unittest2.TestCase):
def setUp(self):
def setUp(self, plugin=None):
super(QuantumDbPluginV2TestCase, self).setUp()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
@ -85,6 +87,10 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
self._skip_native_bulk = not _is_native_bulk_supported()
ext_mgr = test_config.get('extension_manager', None)
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
super(QuantumDbPluginV2TestCase, self).tearDown()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
@ -94,14 +100,17 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
cfg.CONF.reset()
def _req(self, method, resource, data=None, fmt='json',
id=None, params=None):
if id:
id=None, params=None, action=None):
if id and action:
path = '/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
elif id:
path = '/%(resource)s/%(id)s.%(fmt)s' % locals()
else:
path = '/%(resource)s.%(fmt)s' % locals()
content_type = 'application/%s' % fmt
body = None
if data:
if data is not None: # empty dict is valid
body = Serializer().serialize(data, content_type)
return create_request(path,
body,
@ -124,6 +133,9 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
def new_update_request(self, resource, data, id, fmt='json'):
return self._req('PUT', resource, data, fmt, id=id)
def new_action_request(self, resource, data, id, action, fmt='json'):
return self._req('PUT', resource, data, fmt, id=id, action=action)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
data = self._deserializers[ctype].deserialize(response.body)['body']
@ -284,9 +296,36 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
res = self._create_port(fmt, net_id, **kwargs)
return self.deserialize(fmt, res)
def _delete(self, collection, id):
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports']:
return self.api
else:
return self.ext_api
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code):
req = self.new_delete_request(collection, id)
req.get_response(self.api)
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
req = self.new_show_request(resource, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code):
req = self.new_update_request(resource, new_data, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
def _list(self, resource):
req = self.new_list_request(resource)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize('json', res)
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """
@ -526,6 +565,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
for k, v in keys:
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
self._delete('ports', port['port']['id'])
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
@ -534,6 +574,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize('json', res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
@ -550,6 +592,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize('json', res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
@ -654,6 +698,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
_list_and_test_ports(1, [port1], tenant_id='tenant_1')
# Tenant_2 request - must return single port
_list_and_test_ports(1, [port2], tenant_id='tenant_2')
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_show_port(self):
with self.port() as port:
@ -877,6 +923,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_requested_subnet_id_not_on_network(self):
fmt = 'json'
@ -921,14 +968,16 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::2')
self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
res = self._create_port(fmt, net_id=net_id)
port3 = self.deserialize(fmt, res)
port4 = self.deserialize(fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port3['port']['fixed_ips']
ips = port4['port']['fixed_ips']
self.assertEquals(len(ips), 2)
self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::3')
self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def test_range_allocation(self):
fmt = 'json'
@ -951,6 +1000,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
@ -970,6 +1021,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
def test_requested_invalid_fixed_ips(self):
fmt = 'json'
@ -1025,6 +1077,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
@ -1035,21 +1088,27 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.5')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP's
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
for a in allocated:
res = self._create_port(fmt, net_id=net_id)
port2 = self.deserialize(fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], a)
self.assertEquals(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_requested_ips_only(self):
fmt = 'json'
with self.subnet() as subnet:
@ -1060,16 +1119,20 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
'10.0.0.3', '10.0.0.17', '10.0.0.19']
ports_to_delete = []
for i in ips_only:
kwargs = {"fixed_ips": [{'ip_address': i}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port = self.deserialize(fmt, res)
ports_to_delete.append(port)
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], i)
self.assertEquals(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_recycling(self):
fmt = 'json'
@ -1095,6 +1158,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.1.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
def test_invalid_admin_state(self):
with self.network() as network:
@ -1181,56 +1245,64 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
def test_update_network_set_not_shared_single_tenant(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertFalse(res['network']['shared'])
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_other_tenant_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
res2 = self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
port2 = self.deserialize('json', res2)
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
self._create_subnet('json',
network['network']['id'],
'10.0.0.0/24',
@ -1243,6 +1315,9 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_create_networks_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")