vmware-nsx/quantum/tests/unit/test_db_plugin.py

2044 lines
94 KiB
Python

# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import copy
import datetime
import logging
import mock
import os
import random
import unittest2
import webob.exc
import quantum
from quantum.api.v2 import attributes
from quantum.api.v2.attributes import ATTR_NOT_SPECIFIED
from quantum.api.v2.router import APIRouter
from quantum.common import config
from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config
from quantum import context
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.openstack.common import timeutils
from quantum.tests.unit import test_extensions
from quantum.tests.unit.testlib_api import create_request
from quantum.wsgi import Serializer, JSONDeserializer
LOG = logging.getLogger(__name__)
DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
def etcdir(*p):
return os.path.join(ETCDIR, *p)
class QuantumDbPluginV2TestCase(unittest2.TestCase):
def setUp(self, plugin=None):
super(QuantumDbPluginV2TestCase, self).setUp()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
db._ENGINE = None
db._MAKER = None
# Make sure at each test a new instance of the plugin is returned
QuantumManager._instance = None
self._tenant_id = 'test-tenant'
json_deserializer = JSONDeserializer()
self._deserializers = {
'application/json': json_deserializer,
}
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
# Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
for config_file in test_config.get('config_files', []):
args.extend(['--config-file', config_file])
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.max_dns_nameservers = 2
cfg.CONF.max_subnet_host_routes = 2
self.api = APIRouter()
def _is_native_bulk_supported():
plugin_obj = QuantumManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
ext_mgr = test_config.get('extension_manager', None)
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
super(QuantumDbPluginV2TestCase, self).tearDown()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
db._ENGINE = None
db._MAKER = None
cfg.CONF.reset()
def _req(self, method, resource, data=None, fmt='json',
id=None, params=None, action=None):
if id and action:
path = '/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
elif id:
path = '/%(resource)s/%(id)s.%(fmt)s' % locals()
else:
path = '/%(resource)s.%(fmt)s' % locals()
content_type = 'application/%s' % fmt
body = None
if data is not None: # empty dict is valid
body = Serializer().serialize(data, content_type)
return create_request(path,
body,
content_type,
method,
query_string=params)
def new_create_request(self, resource, data, fmt='json'):
return self._req('POST', resource, data, fmt)
def new_list_request(self, resource, fmt='json', params=None):
return self._req('GET', resource, None, fmt, params=params)
def new_show_request(self, resource, id, fmt='json'):
return self._req('GET', resource, None, fmt, id=id)
def new_delete_request(self, resource, id, fmt='json'):
return self._req('DELETE', resource, None, fmt, id=id)
def new_update_request(self, resource, data, id, fmt='json'):
return self._req('PUT', resource, data, fmt, id=id)
def new_action_request(self, resource, data, id, action, fmt='json'):
return self._req('PUT', resource, data, fmt, id=id, action=action)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
""" Creates a bulk request for any kind of resource """
objects = []
collection = "%ss" % resource
for i in range(0, number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "%s_%s" % (name, i)
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['quantum.context'] = kwargs['context']
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_status_up, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'tenant_id', 'shared'):
# Arg must be present and not empty
if arg in kwargs and kwargs[arg]:
data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
network_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
return network_req.get_response(self.api)
def _create_network_bulk(self, fmt, number, name,
admin_status_up, **kwargs):
base_data = {'network': {'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': 4,
'tenant_id': self._tenant_id}}
for arg in ('allocation_pools',
'ip_version', 'tenant_id',
'enable_dhcp', 'allocation_pools',
'dns_nameservers', 'host_routes'):
# Arg must be present and not null (but can be false)
if arg in kwargs and kwargs[arg] is not None:
data['subnet'][arg] = kwargs[arg]
if kwargs.get('gateway_ip', ATTR_NOT_SPECIFIED) != ATTR_NOT_SPECIFIED:
data['subnet']['gateway_ip'] = kwargs['gateway_ip']
subnet_req = self.new_create_request('subnets', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
subnet_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(0, number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(0, number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
content_type = 'application/' + fmt
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'device_id',
'mac_address', 'name', 'fixed_ips',
'tenant_id', 'device_owner'):
# Arg must be present and not empty
if arg in kwargs and kwargs[arg]:
data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _list_ports(self, fmt, expected_res_status=None,
net_id=None, **kwargs):
query_params = None
if net_id:
query_params = "network_id=%s" % net_id
port_req = self.new_list_request('ports', fmt, query_params)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _create_port_bulk(self, fmt, number, net_id, name,
admin_status_up, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
gateway_ip=gateway,
tenant_id=network['network']['tenant_id'],
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
dns_nameservers=dns_nameservers,
host_routes=host_routes)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, **kwargs):
res = self._create_port(fmt, net_id, **kwargs)
return self.deserialize(fmt, res)
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports']:
return self.api
else:
return self.ext_api
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code):
req = self.new_delete_request(collection, id)
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
req = self.new_show_request(resource, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code):
req = self.new_update_request(resource, new_data, id)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
def _list(self, resource):
req = self.new_list_request(resource)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize('json', res)
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """
def second_call(*args, **kwargs):
raise Exception('boom')
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(self, res, collection):
self.assertEqual(res.status_int, 500)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
items = self.deserialize('json', res)
self.assertEqual(len(items[collection]), 0)
def _validate_behavior_on_bulk_success(self, res, collection,
names=['test_0', 'test_1']):
self.assertEqual(res.status_int, 201)
items = self.deserialize('json', res)[collection]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
@contextlib.contextmanager
def network(self, name='net1',
admin_status_up=True,
fmt='json',
**kwargs):
res = self._create_network(fmt,
name,
admin_status_up,
**kwargs)
network = self.deserialize(fmt, res)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield network
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
def subnet(self, network=None,
gateway_ip=ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
fmt='json',
ip_version=4,
allocation_pools=None,
enable_dhcp=True,
dns_nameservers=None,
host_routes=None):
# TODO(anyone) DRY this
# NOTE(salvatore-orlando): we can pass the network object
# to gen function anyway, and then avoid the repetition
if not network:
with self.network() as network:
subnet = self._make_subnet(fmt,
network,
gateway_ip,
cidr,
allocation_pools,
ip_version,
enable_dhcp,
dns_nameservers,
host_routes)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
else:
subnet = self._make_subnet(fmt,
network,
gateway_ip,
cidr,
allocation_pools,
ip_version,
enable_dhcp,
dns_nameservers,
host_routes)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
def port(self, subnet=None, fixed_ips=None, fmt='json', **kwargs):
if not subnet:
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
**kwargs)
yield port
self._delete('ports', port['port']['id'])
else:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
**kwargs)
yield port
self._delete('ports', port['port']['id'])
class TestBasicGet(QuantumDbPluginV2TestCase):
def test_single_get_admin(self):
plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
def test_single_get_tenant(self):
plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
def test_create_returns_201(self):
res = self._create_network('json', 'net2', True)
self.assertEquals(res.status_int, 201)
def test_list_returns_200(self):
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
def _check_list_with_fields(self, res, field_name):
self.assertEquals(res.status_int, 200)
body = self.deserialize('json', res)
# further checks: 1 networks
self.assertEquals(len(body['networks']), 1)
# 1 field in the network record
self.assertEquals(len(body['networks'][0]), 1)
# field is 'name'
self.assertIn(field_name, body['networks'][0])
def test_list_with_fields(self):
self._create_network('json', 'some_net', True)
req = self.new_list_request('networks', params="fields=name")
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin(self):
tenant_id = 'some_tenant'
self._create_network('json',
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=name")
req.environ['quantum.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin_and_policy_field(self):
""" If a field used by policy is selected, do not duplicate it.
Verifies that if the field parameter explicitly specifies a field
which is used by the policy engine, then it is not duplicated
in the response.
"""
tenant_id = 'some_tenant'
self._create_network('json',
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=tenant_id")
req.environ['quantum.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'tenant_id')
def test_show_returns_200(self):
with self.network() as net:
req = self.new_show_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
def test_delete_returns_204(self):
res = self._create_network('json', 'net1', True)
net = self.deserialize('json', res)
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_update_returns_200(self):
with self.network() as net:
req = self.new_update_request('networks',
{'network': {'name': 'steve'}},
net['network']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
def test_bad_route_404(self):
req = self.new_list_request('doohickeys')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 404)
class TestPortsV2(QuantumDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', 'ACTIVE')]
with self.port(name='myname') as port:
for k, v in keys:
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals('myname', port['port']['name'])
def test_create_port_bad_tenant(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': 'bad_tenant_id',
'admin_state_up': True,
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEquals(res.status_int, 403)
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', 'ACTIVE')]
with self.network(shared=True) as network:
port_res = self._create_port('json',
network['network']['id'],
201,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize('json', port_res)
for k, v in keys:
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
self._delete('ports', port['port']['id'])
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
with self.network() as net:
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize('json', res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize('json', res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
overrides = {1: {'admin_state_up': 'doh'}}
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True,
override=overrides)
self.assertEqual(res.status_int, 400)
req = self.new_list_request('ports')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
ports = self.deserialize('json', res)
self.assertEqual(len(ports['ports']), 0)
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = QuantumManager.get_plugin().create_port
with mock.patch.object(QuantumManager.get_plugin(),
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk('json', 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'ports')
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
orig = QuantumManager._instance.plugin.create_port
with mock.patch.object(QuantumManager._instance.plugin,
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'ports')
def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
port_list = self.deserialize('json', req.get_response(self.api))
self.assertEqual(len(port_list['ports']), 2)
ids = [p['id'] for p in port_list['ports']]
self.assertTrue(port1['port']['id'] in ids)
self.assertTrue(port2['port']['id'] in ids)
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
portres_1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='tenant_1',
set_context=True)
portres_2 = self._create_port('json',
network['network']['id'],
201,
tenant_id='tenant_2',
set_context=True)
port1 = self.deserialize('json', portres_1)
port2 = self.deserialize('json', portres_2)
def _list_and_test_ports(expected_len, ports, tenant_id=None):
set_context = tenant_id is not None
port_res = self._list_ports('json',
200,
network['network']['id'],
tenant_id=tenant_id,
set_context=set_context)
port_list = self.deserialize('json', port_res)
self.assertEqual(len(port_list['ports']), expected_len)
ids = [p['id'] for p in port_list['ports']]
for port in ports:
self.assertIn(port['port']['id'], ids)
# Admin request - must return both ports
_list_and_test_ports(2, [port1, port2])
# Tenant_1 request - must return single port
_list_and_test_ports(1, [port1], tenant_id='tenant_1')
# Tenant_2 request - must return single port
_list_and_test_ports(1, [port2], tenant_id='tenant_2')
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], 'json')
sport = self.deserialize('json', req.get_response(self.api))
self.assertEquals(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
port_id = None
with self.port() as port:
port_id = port['port']['id']
req = self.new_show_request('port', 'json', port['port']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 404)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
port_res = self._create_port('json',
network['network']['id'],
201,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize('json', port_res)
port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
# Todo: verify!!!
def test_update_port(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
def test_delete_network_if_port_exists(self):
fmt = 'json'
with self.port() as port:
net_id = port['port']['network_id']
req = self.new_delete_request('networks',
port['port']['network_id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 409)
def test_update_port_delete_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(res['port']['fixed_ips'],
data['port']['fixed_ips'])
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.10')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_update_ips(self):
"""Update IP and generate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEquals(len(ips), 2)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEquals(ips[1]['ip_address'], '10.0.0.3')
self.assertEquals(ips[1]['subnet_id'], subnet['subnet']['id'])
def test_requested_duplicate_mac(self):
fmt = 'json'
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac[0:2]
self.assertTrue(mac.startswith(base_mac))
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 409)
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56"))
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56:78"))
def test_bad_mac_format(self):
cfg.CONF.set_override('base_mac', "bad_mac")
try:
self.plugin._check_base_mac_format()
except:
return
self.fail("No exception for illegal base_mac format")
def test_mac_exhaustion(self):
# rather than actually consuming all MAC (would take a LONG time)
# we just raise the exception that would result.
@staticmethod
def fake_gen_mac(context, net_id):
raise q_exc.MacAddressGenerationFailure(net_id=net_id)
fmt = 'json'
with mock.patch.object(quantum.db.db_base_plugin_v2.QuantumDbPluginV2,
'_generate_mac', new=fake_gen_mac):
res = self._create_network(fmt=fmt, name='net1',
admin_status_up=True)
network = self.deserialize(fmt, res)
net_id = network['network']['id']
res = self._create_port(fmt, net_id=net_id)
self.assertEquals(res.status_int, 503)
def test_requested_duplicate_ip(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 409)
def test_requested_subnet_delete(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
req = self.new_delete_request('subnet',
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 404)
def test_requested_subnet_id(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_requested_subnet_id_not_on_network(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
# Create new network
res = self._create_network(fmt=fmt, name='net2',
admin_status_up=True)
network2 = self.deserialize(fmt, res)
subnet2 = self._make_subnet(fmt, network2, "1.1.1.1",
"1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
subnet2['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
self.assertEquals(res.status_int, 400)
def test_requested_subnet_id_v4_and_v6(self):
fmt = 'json'
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::0/124',
ip_version=6,
gateway_ip=ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(fmt, res)
ips = port3['port']['fixed_ips']
self.assertEquals(len(ips), 2)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::2')
self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
res = self._create_port(fmt, net_id=net_id)
port4 = self.deserialize(fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEquals(len(ips), 2)
self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::3')
self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def test_range_allocation(self):
fmt = 'json'
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port = self.deserialize(fmt, res)
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 5)
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for i in range(len(alloc)):
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port = self.deserialize(fmt, res)
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 5)
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for i in range(len(alloc)):
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
def test_requested_invalid_fixed_ips(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Test invalid subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id':
'00000000-ffff-ffff-ffff-000000000000'}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 404)
# Test invalid IP address on specified subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 400)
# Test invalid addresses - IP's not on subnet or network
# address or broadcast address
bad_ips = ['1.1.1.1', '10.0.0.0', '10.0.0.255']
net_id = port['port']['network_id']
for ip in bad_ips:
kwargs = {"fixed_ips": [{'ip_address': ip}]}
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
self.assertEquals(res.status_int, 400)
# Enable allocation of gateway address
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.1'}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.1')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_requested_split(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.5')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP's
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
for a in allocated:
res = self._create_port(fmt, net_id=net_id)
port2 = self.deserialize(fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], a)
self.assertEquals(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_requested_ips_only(self):
fmt = 'json'
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
'10.0.0.3', '10.0.0.17', '10.0.0.19']
ports_to_delete = []
for i in ips_only:
kwargs = {"fixed_ips": [{'ip_address': i}]}
net_id = port['port']['network_id']
res = self._create_port(fmt, net_id=net_id, **kwargs)
port = self.deserialize(fmt, res)
ports_to_delete.append(port)
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], i)
self.assertEquals(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_recycling(self):
fmt = 'json'
with self.subnet(cidr='10.0.1.0/24') as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.1.2')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
net_id = port['port']['network_id']
ports = []
for i in range(16 - 3):
res = self._create_port(fmt, net_id=net_id)
p = self.deserialize(fmt, res)
ports.append(p)
for i in range(16 - 3):
x = random.randrange(0, len(ports), 1)
p = ports.pop(x)
self._delete('ports', p['port']['id'])
res = self._create_port(fmt, net_id=net_id)
port = self.deserialize(fmt, res)
ips = port['port']['fixed_ips']
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.1.3')
self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port['port']['id'])
def test_invalid_admin_state(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': network['network']['tenant_id'],
'admin_state_up': 7,
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_invalid_mac_address(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': network['network']['tenant_id'],
'admin_state_up': 1,
'mac_address': 'mac',
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEquals(res.status_int, 422)
class TestNetworksV2(QuantumDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are
# effectively tested above
def test_create_network(self):
name = 'net1'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False)]
with self.network(name=name) as net:
for k, v in keys:
self.assertEquals(net['network'][k], v)
def test_create_public_network(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with self.network(name=name, shared=True) as net:
for k, v in keys:
self.assertEquals(net['network'][k], v)
def test_create_public_network_no_admin_tenant(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
tenant_id="another_tenant",
set_context=True):
pass
self.assertEquals(ctx_manager.exception.code, 403)
def test_update_network(self):
with self.network() as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['network']['name'],
data['network']['name'])
def test_update_shared_network_noadmin_returns_403(self):
with self.network(shared=True) as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
req.environ['quantum.context'] = context.Context('', 'somebody')
res = req.get_response(self.api)
# The API layer always returns 404 on updates in place of 403
self.assertEqual(res.status_int, 404)
def test_update_network_set_shared(self):
with self.network(shared=False) as network:
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertTrue(res['network']['shared'])
def test_update_network_set_not_shared_single_tenant(self):
with self.network(shared=True) as network:
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertFalse(res['network']['shared'])
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_other_tenant_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
res2 = self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
port2 = self.deserialize('json', res2)
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
self._create_subnet('json',
network['network']['id'],
'10.0.0.0/24',
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
port1 = self.deserialize('json', res1)
self._delete('ports', port1['port']['id'])
def test_create_networks_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
res = self._create_network_bulk('json', 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
def test_create_networks_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
res = self._create_network_bulk('json', 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
def test_create_networks_bulk_wrong_input(self):
res = self._create_network_bulk('json', 2, 'test', True,
override={1:
{'admin_state_up': 'doh'}})
self.assertEqual(res.status_int, 400)
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
nets = self.deserialize('json', res)
self.assertEqual(len(nets['networks']), 0)
def test_create_networks_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
orig = QuantumManager.get_plugin().create_network
#ensures the API choose the emulation code path
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with mock.patch.object(QuantumManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk('json', 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'networks')
def test_create_networks_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
orig = QuantumManager.get_plugin().create_network
with mock.patch.object(QuantumManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk('json', 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'networks')
def test_list_networks(self):
with self.network(name='net1') as net1:
with self.network(name='net2') as net2:
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['networks'][0]['name'],
net1['network']['name'])
self.assertEquals(res['networks'][1]['name'],
net2['network']['name'])
def test_list_networks_with_parameters(self):
with self.network(name='net1', admin_status_up=False) as net1:
with self.network(name='net2') as net2:
req = self.new_list_request('networks',
params='admin_state_up=False')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(1, len(res['networks']))
self.assertEquals(res['networks'][0]['name'],
net1['network']['name'])
req = self.new_list_request('networks',
params='admin_state_up=true')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(1, len(res['networks']))
self.assertEquals(res['networks'][0]['name'],
net2['network']['name'])
def test_list_networks_with_parameters_invalid_values(self):
with self.network(name='net1', admin_status_up=False) as net1:
with self.network(name='net2') as net2:
req = self.new_list_request('networks',
params='admin_state_up=fake')
res = req.get_response(self.api)
self.assertEquals(422, res.status_int)
def test_show_network(self):
with self.network(name='net1') as net:
req = self.new_show_request('networks', net['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['network']['name'],
net['network']['name'])
def test_invalid_admin_status(self):
fmt = 'json'
value = [[7, False, 400], [True, True, 201], ["True", True, 201],
["true", True, 201], [1, True, 201], ["False", False, 201],
[False, False, 201], ["false", False, 201],
["7", False, 400]]
for v in value:
data = {'network': {'name': 'net',
'admin_state_up': v[0],
'tenant_id': self._tenant_id}}
network_req = self.new_create_request('networks', data)
req = network_req.get_response(self.api)
self.assertEquals(req.status_int, v[2])
if v[2] == 201:
res = self.deserialize(fmt, req)
self.assertEquals(res['network']['admin_state_up'], v[1])
class TestSubnetsV2(QuantumDbPluginV2TestCase):
def _test_create_subnet(self, network=None, **kwargs):
keys = kwargs.copy()
keys.setdefault('cidr', '10.0.0.0/24')
keys.setdefault('ip_version', 4)
keys.setdefault('enable_dhcp', True)
with self.subnet(network=network, **keys) as subnet:
# verify the response has each key with the correct value
for k in keys:
self.assertIn(k, subnet['subnet'])
self.assertEquals(subnet['subnet'][k], keys[k])
return subnet
def test_create_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
subnet = self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr)
self.assertTrue('name' in subnet['subnet'])
def test_create_two_subnets(self):
gateway_ips = ['10.0.0.1', '10.0.1.1']
cidrs = ['10.0.0.0/24', '10.0.1.0/24']
with self.network() as network:
with self.subnet(network=network,
gateway_ip=gateway_ips[0],
cidr=cidrs[0]):
with self.subnet(network=network,
gateway_ip=gateway_ips[1],
cidr=cidrs[1]):
net_req = self.new_show_request('networks',
network['network']['id'])
raw_res = net_req.get_response(self.api)
net_res = self.deserialize('json', raw_res)
for subnet_id in net_res['network']['subnets']:
sub_req = self.new_show_request('subnets', subnet_id)
raw_res = sub_req.get_response(self.api)
sub_res = self.deserialize('json', raw_res)
self.assertIn(sub_res['subnet']['cidr'], cidrs)
self.assertIn(sub_res['subnet']['gateway_ip'],
gateway_ips)
def test_create_two_subnets_same_cidr_returns_400(self):
gateway_ip_1 = '10.0.0.1'
cidr_1 = '10.0.0.0/24'
gateway_ip_2 = '10.0.0.10'
cidr_2 = '10.0.0.0/24'
with self.network() as network:
with self.subnet(network=network,
gateway_ip=gateway_ip_1,
cidr=cidr_1):
with self.assertRaises(
webob.exc.HTTPClientError) as ctx_manager:
with self.subnet(network=network,
gateway_ip=gateway_ip_2,
cidr=cidr_2):
pass
self.assertEquals(ctx_manager.exception.code, 400)
def test_create_subnets_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
with self.network() as net:
res = self._create_subnet_bulk('json', 2, net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
def test_create_subnets_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
def test_create_subnets_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = QuantumManager.get_plugin().create_subnet
with mock.patch.object(QuantumManager.get_plugin(),
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'subnets')
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
orig = QuantumManager._instance.plugin.create_subnet
with mock.patch.object(QuantumManager._instance.plugin,
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'subnets')
def test_delete_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_status_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_delete_network(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_status_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4)
req = self.new_delete_request('networks', network['network']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_create_subnet_bad_tenant(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': 'bad_tenant_id',
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 403)
def test_create_subnet_defaults(self):
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.254'}]
enable_dhcp = True
subnet = self._test_create_subnet()
# verify cidr & gw have been correctly generated
self.assertEquals(subnet['subnet']['cidr'], cidr)
self.assertEquals(subnet['subnet']['gateway_ip'], gateway)
self.assertEquals(subnet['subnet']['enable_dhcp'], enable_dhcp)
self.assertEquals(subnet['subnet']['allocation_pools'],
allocation_pools)
def test_create_subnet_with_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_none_gateway(self):
cidr = '10.0.0.0/24'
self._test_create_subnet(gateway_ip=None,
cidr=cidr)
def test_create_subnet_with_none_gateway_fully_allocated(self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.254'}]
self._test_create_subnet(gateway_ip=None,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_none_gateway_allocation_pool(self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
self._test_create_subnet(gateway_ip=None,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::0/80'
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_large_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/8'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.1.0.0',
'end': '10.200.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_multiple_allocation_pools(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.0.0.110',
'end': '10.0.0.150'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_dhcp_disabled(self):
enable_dhcp = False
self._test_create_subnet(enable_dhcp=enable_dhcp)
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
gateway_ip = '10.0.0.50'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.100'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 409)
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.150'},
{'start': '10.0.0.140',
'end': '10.0.0.180'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 409)
def test_create_subnet_invalid_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.256'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 400)
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.1.6'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 400)
def test_update_subnet(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['subnet']['gateway_ip'],
data['subnet']['gateway_ip'])
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet:
req = self.new_show_request('subnets',
subnet['subnet']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['subnet']['id'],
subnet['subnet']['id'])
self.assertEquals(res['subnet']['network_id'],
network['network']['id'])
def test_list_subnets(self):
# NOTE(jkoelker) This would be a good place to use contextlib.nested
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway_ip='10.0.0.1',
cidr='10.0.0.0/24') as subnet:
with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request('subnets')
res = self.deserialize('json',
req.get_response(self.api))
res1 = res['subnets'][0]
res2 = res['subnets'][1]
self.assertEquals(res1['cidr'],
subnet['subnet']['cidr'])
self.assertEquals(res2['cidr'],
subnet2['subnet']['cidr'])
def test_list_subnets_with_parameter(self):
# NOTE(jkoelker) This would be a good place to use contextlib.nested
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway_ip='10.0.0.1',
cidr='10.0.0.0/24') as subnet:
with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request(
'subnets',
params='ip_version=4&ip_version=6')
res = self.deserialize('json',
req.get_response(self.api))
self.assertEquals(2, len(res['subnets']))
req = self.new_list_request('subnets',
params='ip_version=6')
res = self.deserialize('json',
req.get_response(self.api))
self.assertEquals(0, len(res['subnets']))
def test_invalid_ip_version(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 7,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 422)
def test_invalid_subnet(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'invalid',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 422)
def test_invalid_ip_address(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': 'ipaddress'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 422)
def test_invalid_uuid(self):
with self.network() as network:
data = {'subnet': {'network_id': 'invalid-uuid',
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 422)
def test_create_subnet_with_one_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
dns_nameservers = ['1.2.3.4']
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
dns_nameservers=dns_nameservers)
def test_create_subnet_with_two_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
dns_nameservers = ['1.2.3.4', '4.3.2.1']
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
dns_nameservers=dns_nameservers)
def test_create_subnet_with_too_many_dns(self):
with self.network() as network:
dns_list = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1',
'dns_nameservers': dns_list}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_create_subnet_with_one_host_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
host_routes=host_routes)
def test_create_subnet_with_two_host_routes(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'},
{'destination': '12.0.0.0/8',
'nexthop': '4.3.2.1'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
host_routes=host_routes)
def test_create_subnet_with_too_many_routes(self):
with self.network() as network:
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'},
{'destination': '12.0.0.0/8',
'nexthop': '4.3.2.1'},
{'destination': '141.212.0.0/16',
'nexthop': '2.2.2.2'}]
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1',
'host_routes': host_routes}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_dns(self):
with self.subnet() as subnet:
data = {'subnet': {'dns_nameservers': ['11.0.0.1']}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['subnet']['dns_nameservers'],
data['subnet']['dns_nameservers'])
def test_update_subnet_dns_with_too_many_entries(self):
with self.subnet() as subnet:
dns_list = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
data = {'subnet': {'dns_nameservers': dns_list}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_update_subnet_route(self):
with self.subnet() as subnet:
data = {'subnet': {'host_routes':
[{'destination': '12.0.0.0/8', 'nexthop': '1.2.3.4'}]}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['subnet']['host_routes'],
data['subnet']['host_routes'])
def test_update_subnet_route_with_too_many_entries(self):
with self.subnet() as subnet:
data = {'subnet': {'host_routes': [
{'destination': '12.0.0.0/8', 'nexthop': '1.2.3.4'},
{'destination': '13.0.0.0/8', 'nexthop': '1.2.3.5'},
{'destination': '14.0.0.0/8', 'nexthop': '1.2.3.6'}]}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 400)
def test_delete_subnet_with_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
dns_nameservers = ['1.2.3.4']
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_status_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4,
dns_nameservers=dns_nameservers)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_delete_subnet_with_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_status_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4,
host_routes=host_routes)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_delete_subnet_with_dns_and_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
dns_nameservers = ['1.2.3.4']
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_status_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4,
dns_nameservers=dns_nameservers,
host_routes=host_routes)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_default_allocation_expiration(self):
reference = datetime.datetime(2012, 8, 13, 23, 11, 0)
timeutils.utcnow.override_time = reference
cfg.CONF.set_override('dhcp_lease_duration', 120)
expires = QuantumManager.get_plugin()._default_allocation_expiration()
timeutils.utcnow
cfg.CONF.reset()
timeutils.utcnow.override_time = None
self.assertEqual(expires, reference + datetime.timedelta(seconds=120))