Merge "refactor LoadBalancerPluginDbTestCase setUp()"

This commit is contained in:
Jenkins 2013-02-24 00:15:10 +00:00 committed by Gerrit Code Review
commit e116a036f5
2 changed files with 145 additions and 258 deletions

View File

@ -32,6 +32,8 @@ from quantum.extensions import loadbalancer
from quantum.manager import QuantumManager
from quantum.plugins.common import constants
from quantum.plugins.services.loadbalancer import loadbalancerPlugin
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
from quantum.tests.unit.testlib_api import create_request
from quantum import wsgi
@ -54,39 +56,21 @@ def etcdir(*p):
return os.path.join(ETCDIR, *p)
class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
resource_prefix_map = dict(
(k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self, core_plugin=None, lb_plugin=None):
super(LoadBalancerPluginDbTestCase, self).setUp()
service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
db._ENGINE = None
db._MAKER = None
super(LoadBalancerPluginDbTestCase, self).setUp(
service_plugins=service_plugins
)
QuantumManager._instance = None
PluginAwareExtensionManager._instance = None
self._attribute_map_bk = {}
self._attribute_map_bk = loadbalancer.RESOURCE_ATTRIBUTE_MAP.copy()
self._tenant_id = "test-tenant"
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
if not core_plugin:
core_plugin = test_config.get('plugin_name_v2',
DB_CORE_PLUGIN_KLASS)
if not lb_plugin:
lb_plugin = test_config.get('lb_plugin_name', DB_LB_PLUGIN_KLASS)
# point config file to: quantum/tests/etc/quantum.conf.test
args = ['--config-file', etcdir('quantum.conf.test')]
config.parse(args=args)
# Update the plugin
service_plugins = [lb_plugin]
cfg.CONF.set_override('core_plugin', core_plugin)
cfg.CONF.set_override('service_plugins', service_plugins)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
plugin = loadbalancerPlugin.LoadBalancerPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
@ -94,73 +78,6 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
super(LoadBalancerPluginDbTestCase, self).setUp()
def tearDown(self):
super(LoadBalancerPluginDbTestCase, self).tearDown()
self.api = None
self._skip_native_bulk = None
self.ext_api = None
db.clear_db()
db._ENGINE = None
db._MAKER = None
cfg.CONF.reset()
# Restore the original attribute map
loadbalancer.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
def _req(self, method, resource, data=None, fmt=None,
id=None, subresource=None, sub_id=None, params=None, action=None):
if not fmt:
fmt = self.fmt
if id and action:
path = '/lb/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
elif id and subresource and sub_id:
path = (
'/lb/%(resource)s/%(id)s/%(subresource)s/'
'%(sub_id)s.%(fmt)s') % locals()
elif id and subresource:
path = (
'/lb/%(resource)s/%(id)s/'
'%(subresource)s.%(fmt)s') % locals()
elif id:
path = '/lb/%(resource)s/%(id)s.%(fmt)s' % locals()
else:
path = '/lb/%(resource)s.%(fmt)s' % locals()
content_type = 'application/%s' % fmt
body = None
if data is not None: # empty dict is valid
body = wsgi.Serializer(
attributes.get_attr_metadata()).serialize(data, content_type)
req = create_request(path,
body,
content_type,
method,
query_string=params)
return req
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None):
return self._req('POST', resource, data, fmt, id=id,
subresource=subresource)
def new_list_request(self, resource, fmt=None, params=None):
return self._req('GET', resource, None, fmt, params=params)
def new_show_request(self, resource, id, fmt=None, action=None,
subresource=None, sub_id=None):
return self._req('GET', resource, None, fmt, id=id, action=action,
subresource=subresource, sub_id=sub_id)
def new_delete_request(self, resource, id, fmt=None,
subresource=None, sub_id=None):
return self._req('DELETE', resource, None, fmt, id=id,
subresource=subresource, sub_id=sub_id)
def new_update_request(self, resource, data, id, fmt=None):
return self._req('PUT', resource, data, fmt, id=id)
def _create_vip(self, fmt, name, pool_id, protocol, port, admin_state_up,
expected_res_status=None, **kwargs):
@ -246,107 +163,6 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
else:
return self.ext_api
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code):
req = self.new_delete_request(collection, id)
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
req = self.new_show_request(resource, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize(res)
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code):
req = self.new_update_request(resource, new_data, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize(res)
def _list(self, resource, fmt=None, query_params=None):
req = self.new_list_request(resource, fmt, query_params)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(res)
def _test_list_with_sort(self, collection, items, sorts, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
req = self.new_list_request('%ss' % collection,
params=query_str)
api = self._api_for_resource('%ss' % collection)
res = self.deserialize(req.get_response(api))
collection = collection.replace('-', '_')
expected_res = [item[collection]['id'] for item in items]
self.assertListEqual([n['id'] for n in res["%ss" % collection]],
expected_res)
def _test_list_with_pagination(self, collection, items, sort,
limit, expected_page_num, query_params=''):
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request("%ss" % collection, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource('%ss' % collection)
collection = collection.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
items_res = items_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'next':
req = create_request(link['href'],
'', 'application/json')
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
self.assertListEqual([n['id'] for n in items_res],
[item[collection]['id'] for item in items])
def _test_list_with_pagination_reverse(self, collection, items, sort,
limit, expected_page_num,
query_params=''):
resources = '%ss' % collection
collection = collection.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][collection]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][collection]]
page_num = 0
while req:
page_num = page_num + 1
res = self.deserialize(req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
res["%ss" % collection].reverse()
item_res = item_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
for link in res['%ss_links' % collection]:
if link['rel'] == 'previous':
req = create_request(link['href'],
'', 'application/json')
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
expected_res = [item[collection]['id'] for item in items]
expected_res.reverse()
self.assertListEqual([n['id'] for n in item_res],
expected_res)
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None,
protocol='HTTP', port=80, admin_state_up=True, no_delete=False,
@ -364,7 +180,7 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
admin_state_up,
address=address,
**kwargs)
vip = self.deserialize(res)
vip = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield vip
@ -380,7 +196,7 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
admin_state_up,
address=address,
**kwargs)
vip = self.deserialize(res)
vip = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield vip
@ -399,7 +215,7 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
protocol,
admin_state_up,
**kwargs)
pool = self.deserialize(res)
pool = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield pool
@ -417,7 +233,7 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
port,
admin_state_up,
**kwargs)
member = self.deserialize(res)
member = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield member
@ -438,7 +254,7 @@ class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
max_retries,
admin_state_up,
**kwargs)
health_monitor = self.deserialize(res)
health_monitor = self.deserialize(fmt or self.fmt, res)
the_health_monitor = health_monitor['health_monitor']
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
@ -561,7 +377,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
# Try resetting session_persistence
req = self.new_update_request('vips', update_info, v['vip']['id'])
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
# If session persistence has been removed, it won't be present in
# the response.
@ -585,7 +401,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
'cookie_name': "jesssionId"},
'admin_state_up': False}}
req = self.new_update_request('vips', data, vip['vip']['id'])
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vip'][k], v)
@ -610,7 +426,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
with self.vip(name=name) as vip:
req = self.new_show_request('vips',
vip['vip']['id'])
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vip'][k], v)
@ -626,7 +442,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
('status', 'PENDING_CREATE')]
with self.vip(name=name):
req = self.new_list_request('vips')
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vips'][0][k], v)
@ -692,9 +508,12 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('pools',
pool_id,
fmt=self.fmt)
pool_updated = self.deserialize(req.get_response(self.ext_api))
pool_updated = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
member1 = self.deserialize(res1)
member1 = self.deserialize(self.fmt, res1)
self.assertEqual(member1['member']['id'],
pool_updated['pool']['members'][0])
self.assertEqual(len(pool_updated['pool']['members']), 1)
@ -731,7 +550,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('pools',
pool['pool']['id'],
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['pool'][k], v)
@ -774,7 +593,9 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
pool_id,
fmt=self.fmt)
pool_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
self.assertIn(member1['member']['id'],
pool_update['pool']['members'])
self.assertIn(member2['member']['id'],
@ -795,14 +616,18 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
pool1['pool']['id'],
fmt=self.fmt)
pool1_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
self.assertEqual(len(pool1_update['pool']['members']), 1)
req = self.new_show_request('pools',
pool2['pool']['id'],
fmt=self.fmt)
pool2_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
self.assertEqual(len(pool1_update['pool']['members']), 1)
self.assertEqual(len(pool2_update['pool']['members']), 0)
@ -812,7 +637,10 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_update_request('members',
data,
member['member']['id'])
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
for k, v in keys:
self.assertEqual(res['member'][k], v)
@ -820,13 +648,17 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
pool1['pool']['id'],
fmt=self.fmt)
pool1_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
req = self.new_show_request('pools',
pool2['pool']['id'],
fmt=self.fmt)
pool2_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
self.assertEqual(len(pool2_update['pool']['members']), 1)
self.assertEqual(len(pool1_update['pool']['members']), 0)
@ -845,7 +677,9 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
pool_id,
fmt=self.fmt)
pool_update = self.deserialize(
req.get_response(self.ext_api))
self.fmt,
req.get_response(self.ext_api)
)
self.assertEqual(len(pool_update['pool']['members']), 0)
def test_show_member(self):
@ -861,7 +695,10 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('members',
member['member']['id'],
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
for k, v in keys:
self.assertEqual(res['member'][k], v)
@ -931,7 +768,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_update_request("health_monitors",
data,
monitor['health_monitor']['id'])
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
@ -954,7 +791,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('health_monitors',
monitor['health_monitor']['id'],
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
@ -994,7 +831,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
pool['pool']['id'],
subresource="stats",
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['stats'][k], v)
@ -1030,7 +867,10 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
'pools',
pool['pool']['id'],
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
self.assertIn(monitor1['health_monitor']['id'],
res['pool']['health_monitors'])
self.assertIn(monitor2['health_monitor']['id'],
@ -1079,7 +919,10 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
'pools',
pool['pool']['id'],
fmt=self.fmt)
res = self.deserialize(req.get_response(self.ext_api))
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
self.assertNotIn(monitor1['health_monitor']['id'],
res['pool']['health_monitors'])
self.assertIn(monitor2['health_monitor']['id'],
@ -1113,7 +956,7 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
'10',
'3',
True)
health_monitor = self.deserialize(req)
health_monitor = self.deserialize(self.fmt, req)
self.assertEqual(req.status_int, 201)
# Associate the health_monitor to the pool
@ -1132,9 +975,12 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('pools',
pool_id,
fmt=self.fmt)
pool_updated = self.deserialize(req.get_response(self.ext_api))
member1 = self.deserialize(res1)
member2 = self.deserialize(res2)
pool_updated = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
member1 = self.deserialize(self.fmt, res1)
member2 = self.deserialize(self.fmt, res2)
self.assertIn(member1['member']['id'],
pool_updated['pool']['members'])
self.assertIn(member2['member']['id'],
@ -1145,7 +991,10 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
req = self.new_show_request('vips',
vip_id,
fmt=self.fmt)
vip_updated = self.deserialize(req.get_response(self.ext_api))
vip_updated = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
self.assertEqual(vip_updated['vip']['pool_id'],
pool_updated['pool']['id'])

View File

@ -51,9 +51,14 @@ ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
@contextlib.contextmanager
def dummy_context_func():
yield None
def optional_ctx(obj, fallback):
if not obj:
return fallback()
@contextlib.contextmanager
def context_wrapper():
yield obj
return context_wrapper()
def etcdir(*p):
@ -70,8 +75,9 @@ def _fake_get_sorting_helper(self, request):
class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
def setUp(self, plugin=None):
def setUp(self, plugin=None, service_plugins=None):
super(QuantumDbPluginV2TestCase, self).setUp()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
@ -95,6 +101,7 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
if not plugin:
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
# Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
@ -103,6 +110,12 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override(
'service_plugins',
[test_config.get(key, default)
for key, default in (service_plugins or {}).iteritems()]
)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
@ -111,7 +124,6 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self.api = APIRouter()
# Set the defualt port status
self.port_create_status = 'ACTIVE'
super(QuantumDbPluginV2TestCase, self).setUp()
def _is_native_bulk_supported():
plugin_obj = QuantumManager.get_plugin()
@ -162,15 +174,19 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
def _req(self, method, resource, data=None, fmt=None,
id=None, params=None, action=None):
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None):
fmt = fmt or self.fmt
if id and action:
path = '/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
elif id:
path = '/%(resource)s/%(id)s.%(fmt)s' % locals()
else:
path = '/%(resource)s.%(fmt)s' % locals()
path = '/%s.%s' % (
'/'.join(p for p in
(resource, id, subresource, sub_id, action) if p),
fmt
)
prefix = self.resource_prefix_map.get(resource)
if prefix:
path = prefix + path
content_type = 'application/%s' % fmt
body = None
@ -179,23 +195,51 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
return testlib_api.create_request(path, body, content_type, method,
query_string=params)
def new_create_request(self, resource, data, fmt=None):
return self._req('POST', resource, data, fmt)
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None):
return self._req('POST', resource, data, fmt, id=id,
subresource=subresource)
def new_list_request(self, resource, fmt=None, params=None):
return self._req('GET', resource, None, fmt, params=params)
def new_list_request(self, resource, fmt=None, params=None,
subresource=None):
return self._req(
'GET', resource, None, fmt, params=params, subresource=subresource
)
def new_show_request(self, resource, id, fmt=None):
return self._req('GET', resource, None, fmt, id=id)
def new_show_request(self, resource, id, fmt=None, subresource=None):
return self._req(
'GET', resource, None, fmt, id=id, subresource=subresource
)
def new_delete_request(self, resource, id, fmt=None):
return self._req('DELETE', resource, None, fmt, id=id)
def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None):
return self._req(
'DELETE',
resource,
None,
fmt,
id=id,
subresource=subresource,
sub_id=sub_id
)
def new_update_request(self, resource, data, id, fmt=None):
return self._req('PUT', resource, data, fmt, id=id)
def new_update_request(self, resource, data, id, fmt=None,
subresource=None):
return self._req(
'PUT', resource, data, fmt, id=id, subresource=subresource
)
def new_action_request(self, resource, data, id, action, fmt=None):
return self._req('PUT', resource, data, fmt, id=id, action=action)
def new_action_request(self, resource, data, id, action, fmt=None,
subresource=None):
return self._req(
'PUT',
resource,
data,
fmt,
id=id,
action=action,
subresource=subresource
)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
@ -503,10 +547,7 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
host_routes=None,
shared=None,
do_delete=True):
with (self.network() if not network
else dummy_context_func()) as network_to_use:
if network:
network_to_use = network
with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
gateway_ip,
@ -526,10 +567,7 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, no_delete=False,
**kwargs):
with (self.subnet() if not subnet
else dummy_context_func()) as subnet_to_use:
if subnet:
subnet_to_use = subnet
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
try: