Fix bulk create operations and make them atomic.

Bug 1024844
Bug 1020639

The API layer is now able to issue bulk create requests to the plugin,
assuming that the plugin supports them. Otherwise, the API layer will
emulate atomic behavior.
This patch also implements OVS plugin support for bulk requests.

Change-Id: I515148d870d0dff8371862fe577c477538364929
This commit is contained in:
Salvatore Orlando 2012-08-13 09:31:38 -07:00
parent 827e470a86
commit ff837025e9
11 changed files with 453 additions and 62 deletions

View File

@ -39,6 +39,8 @@ api_paste_config = api-paste.ini
# Maximum amount of retries to generate a unique MAC address # Maximum amount of retries to generate a unique MAC address
# mac_generation_retries = 16 # mac_generation_retries = 16
# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# RPC configuration options. Defined in rpc __init__ # RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu. # The messaging module to use, defaults to kombu.
# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu # rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu

View File

@ -117,15 +117,23 @@ def verbose(request):
class Controller(object): class Controller(object):
def __init__(self, plugin, collection, resource, attr_info): def __init__(self, plugin, collection, resource,
attr_info, allow_bulk=False):
self._plugin = plugin self._plugin = plugin
self._collection = collection self._collection = collection
self._resource = resource self._resource = resource
self._attr_info = attr_info self._attr_info = attr_info
self._allow_bulk = allow_bulk
self._native_bulk = self._is_native_bulk_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items() self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')] if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network') self._publisher_id = notifier_api.publisher_id('network')
def _is_native_bulk_supported(self):
native_bulk_attr_name = ("_%s__native_bulk_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_bulk_attr_name, False)
def _is_visible(self, attr): def _is_visible(self, attr):
attr_val = self._attr_info.get(attr) attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible'] return attr_val and attr_val['is_visible']
@ -209,6 +217,32 @@ class Controller(object):
# doesn't exist # doesn't exist
raise webob.exc.HTTPNotFound() raise webob.exc.HTTPNotFound()
def _emulate_bulk_create(self, obj_creator, request, body):
objs = []
try:
for item in body[self._collection]:
kwargs = {self._resource: item}
objs.append(self._view(obj_creator(request.context,
**kwargs)))
return objs
# Note(salvatore-orlando): broad catch as in theory a plugin
# could raise any kind of exception
except Exception as ex:
for obj in objs:
delete_action = "delete_%s" % self._resource
obj_deleter = getattr(self._plugin, delete_action)
try:
obj_deleter(request.context, obj['id'])
except Exception:
# broad catch as our only purpose is to log the exception
LOG.exception("Unable to undo add for %s %s",
self._resource, obj['id'])
# TODO(salvatore-orlando): The object being processed when the
# plugin raised might have been created or not in the db.
# We need a way for ensuring that if it has been created,
# it is then deleted
raise
def create(self, request, body=None): def create(self, request, body=None):
"""Creates a new instance of the requested entity""" """Creates a new instance of the requested entity"""
notifier_api.notify(request.context, notifier_api.notify(request.context,
@ -216,10 +250,8 @@ class Controller(object):
self._resource + '.create.start', self._resource + '.create.start',
notifier_api.INFO, notifier_api.INFO,
body) body)
body = self._prepare_request_body(request.context, body, True, body = self._prepare_request_body(request.context, body, True)
allow_bulk=True)
action = "create_%s" % self._resource action = "create_%s" % self._resource
# Check authz # Check authz
try: try:
if self._collection in body: if self._collection in body:
@ -256,16 +288,30 @@ class Controller(object):
LOG.exception("Create operation not authorized") LOG.exception("Create operation not authorized")
raise webob.exc.HTTPForbidden() raise webob.exc.HTTPForbidden()
obj_creator = getattr(self._plugin, action) def notify(create_result):
kwargs = {self._resource: body} notifier_api.notify(request.context,
obj = obj_creator(request.context, **kwargs) self._publisher_id,
result = {self._resource: self._view(obj)} self._resource + '.create.end',
notifier_api.notify(request.context, notifier_api.INFO,
self._publisher_id, create_result)
self._resource + '.create.end', return create_result
notifier_api.INFO,
result) if self._collection in body and self._native_bulk:
return result # plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
objs = obj_creator(request.context, body)
return notify({self._collection: [self._view(obj)
for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
if self._collection in body:
# Emulate atomic bulk behavior
objs = self._emulate_bulk_create(obj_creator, request, body)
return notify({self._collection: objs})
else:
kwargs = {self._resource: body}
obj = obj_creator(request.context, **kwargs)
return notify({self._resource: self._view(obj)})
def delete(self, request, id): def delete(self, request, id):
"""Deletes the specified entity""" """Deletes the specified entity"""
@ -355,8 +401,7 @@ class Controller(object):
" that tenant_id is specified") " that tenant_id is specified")
raise webob.exc.HTTPBadRequest(msg) raise webob.exc.HTTPBadRequest(msg)
def _prepare_request_body(self, context, body, is_create, def _prepare_request_body(self, context, body, is_create):
allow_bulk=False):
""" verifies required attributes are in request body, and that """ verifies required attributes are in request body, and that
an attribute is only specified if it is allowed for the given an attribute is only specified if it is allowed for the given
operation (create/update). operation (create/update).
@ -369,7 +414,7 @@ class Controller(object):
raise webob.exc.HTTPBadRequest(_("Resource body required")) raise webob.exc.HTTPBadRequest(_("Resource body required"))
body = body or {self._resource: {}} body = body or {self._resource: {}}
if self._collection in body and allow_bulk: if self._collection in body and self._allow_bulk:
bulk_body = [self._prepare_request_body(context, bulk_body = [self._prepare_request_body(context,
{self._resource: b}, {self._resource: b},
is_create) is_create)
@ -382,7 +427,7 @@ class Controller(object):
return {self._collection: bulk_body} return {self._collection: bulk_body}
elif self._collection in body and not allow_bulk: elif self._collection in body and not self._allow_bulk:
raise webob.exc.HTTPBadRequest("Bulk operation not supported") raise webob.exc.HTTPBadRequest("Bulk operation not supported")
res_dict = body.get(self._resource) res_dict = body.get(self._resource)
@ -459,8 +504,8 @@ class Controller(object):
}) })
def create_resource(collection, resource, plugin, params): def create_resource(collection, resource, plugin, params, allow_bulk=False):
controller = Controller(plugin, collection, resource, params) controller = Controller(plugin, collection, resource, params, allow_bulk)
# NOTE(jkoelker) To anyone wishing to add "proper" xml support # NOTE(jkoelker) To anyone wishing to add "proper" xml support
# this is where you do it # this is where you do it

View File

@ -69,7 +69,6 @@ class APIRouter(wsgi.Router):
def __init__(self, **local_config): def __init__(self, **local_config):
mapper = routes_mapper.Mapper() mapper = routes_mapper.Mapper()
plugin = manager.QuantumManager.get_plugin() plugin = manager.QuantumManager.get_plugin()
ext_mgr = extensions.PluginAwareExtensionManager.get_instance() ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP) ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
@ -81,8 +80,10 @@ class APIRouter(wsgi.Router):
'port': 'ports'} 'port': 'ports'}
def _map_resource(collection, resource, params): def _map_resource(collection, resource, params):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(collection, resource, controller = base.create_resource(collection, resource,
plugin, params) plugin, params,
allow_bulk=allow_bulk)
mapper_kwargs = dict(controller=controller, mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS, requirements=REQUIREMENTS,
**col_kwargs) **col_kwargs)

View File

@ -43,7 +43,8 @@ core_opts = [
cfg.StrOpt('core_plugin', cfg.StrOpt('core_plugin',
default='quantum.plugins.sample.SamplePlugin.FakePlugin'), default='quantum.plugins.sample.SamplePlugin.FakePlugin'),
cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"), cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"),
cfg.IntOpt('mac_generation_retries', default=16) cfg.IntOpt('mac_generation_retries', default=16),
cfg.BoolOpt('allow_bulk', default=True),
] ]
# Register the configuration options # Register the configuration options

View File

@ -41,6 +41,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
certain events. certain events.
""" """
# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True
def __init__(self): def __init__(self):
# NOTE(jkoelker) This is an incomlete implementation. Subclasses # NOTE(jkoelker) This is an incomlete implementation. Subclasses
# must override __init__ and setup the database # must override __init__ and setup the database
@ -673,12 +678,34 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
"device_id": port["device_id"]} "device_id": port["device_id"]}
return self._fields(res, fields) return self._fields(res, fields)
def _create_bulk(self, resource, context, request_items):
objects = []
collection = "%ss" % resource
items = request_items[collection]
context.session.begin(subtransactions=True)
try:
for item in items:
obj_creator = getattr(self, 'create_%s' % resource)
objects.append(obj_creator(context, item))
context.session.commit()
except Exception:
LOG.exception("An exception occured while creating "
"the port:%s", item)
context.session.rollback()
raise
return objects
def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks)
def create_network(self, context, network): def create_network(self, context, network):
""" handle creation of a single network """
# single request processing
n = network['network'] n = network['network']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises # unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, n) tenant_id = self._get_tenant_id_for_create(context, n)
with context.session.begin(): with context.session.begin(subtransactions=True):
network = models_v2.Network(tenant_id=tenant_id, network = models_v2.Network(tenant_id=tenant_id,
id=n.get('id') or utils.str_uuid(), id=n.get('id') or utils.str_uuid(),
name=n['name'], name=n['name'],
@ -721,6 +748,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
filters=filters, fields=fields, filters=filters, fields=fields,
verbose=verbose) verbose=verbose)
def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)
def create_subnet(self, context, subnet): def create_subnet(self, context, subnet):
s = subnet['subnet'] s = subnet['subnet']
net = netaddr.IPNetwork(s['cidr']) net = netaddr.IPNetwork(s['cidr'])
@ -728,7 +758,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1)) s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
tenant_id = self._get_tenant_id_for_create(context, s) tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin(): with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"]) network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr']) self._validate_subnet_cidr(network, s['cidr'])
subnet = models_v2.Subnet(tenant_id=tenant_id, subnet = models_v2.Subnet(tenant_id=tenant_id,
@ -780,13 +810,16 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
filters=filters, fields=fields, filters=filters, fields=fields,
verbose=verbose) verbose=verbose)
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
def create_port(self, context, port): def create_port(self, context, port):
p = port['port'] p = port['port']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises # unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p) tenant_id = self._get_tenant_id_for_create(context, p)
with context.session.begin(): with context.session.begin(subtransactions=True):
network = self._get_network(context, p["network_id"]) network = self._get_network(context, p["network_id"])
# Ensure that a MAC address is defined and it is unique on the # Ensure that a MAC address is defined and it is unique on the
@ -817,7 +850,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
# Update the allocated IP's # Update the allocated IP's
if ips: if ips:
with context.session.begin(): with context.session.begin(subtransactions=True):
for ip in ips: for ip in ips:
LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'], LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'],
port['network_id'], ip['subnet_id'], port.id) port['network_id'], ip['subnet_id'], port.id)

View File

@ -196,7 +196,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
super(LinuxBridgePluginV2, self).delete_network(context, super(LinuxBridgePluginV2, self).delete_network(context,
net['id']) net['id'])
raise raise
return net return net
def update_network(self, context, id, network): def update_network(self, context, id, network):

View File

@ -40,8 +40,8 @@ def get_vlans():
return [(binding.vlan_id, binding.network_id) for binding in bindings] return [(binding.vlan_id, binding.network_id) for binding in bindings]
def get_vlan(net_id): def get_vlan(net_id, session=None):
session = db.get_session() session = session or db.get_session()
try: try:
binding = (session.query(ovs_models_v2.VlanBinding). binding = (session.query(ovs_models_v2.VlanBinding).
filter_by(network_id=net_id). filter_by(network_id=net_id).
@ -51,11 +51,10 @@ def get_vlan(net_id):
return binding.vlan_id return binding.vlan_id
def add_vlan_binding(vlan_id, net_id): def add_vlan_binding(vlan_id, net_id, session):
session = db.get_session() with session.begin(subtransactions=True):
binding = ovs_models_v2.VlanBinding(vlan_id, net_id) binding = ovs_models_v2.VlanBinding(vlan_id, net_id)
session.add(binding) session.add(binding)
session.flush()
return binding return binding
@ -114,10 +113,9 @@ def get_vlan_id(vlan_id):
return None return None
def reserve_vlan_id(): def reserve_vlan_id(session):
"""Reserve an unused vlan_id""" """Reserve an unused vlan_id"""
session = db.get_session()
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
record = (session.query(ovs_models_v2.VlanID). record = (session.query(ovs_models_v2.VlanID).
filter_by(vlan_used=False). filter_by(vlan_used=False).
@ -129,14 +127,13 @@ def reserve_vlan_id():
return record.vlan_id return record.vlan_id
def reserve_specific_vlan_id(vlan_id): def reserve_specific_vlan_id(vlan_id, session):
"""Reserve a specific vlan_id""" """Reserve a specific vlan_id"""
if vlan_id < 1 or vlan_id > 4094: if vlan_id < 1 or vlan_id > 4094:
msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id
raise q_exc.InvalidInput(error_message=msg) raise q_exc.InvalidInput(error_message=msg)
session = db.get_session()
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
try: try:
record = (session.query(ovs_models_v2.VlanID). record = (session.query(ovs_models_v2.VlanID).

View File

@ -177,6 +177,10 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
be updated to take advantage of it. be updated to take advantage of it.
""" """
# This attribute specifies whether the plugin supports or not
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True
supported_extension_aliases = ["provider"] supported_extension_aliases = ["provider"]
def __init__(self, configfile=None): def __init__(self, configfile=None):
@ -227,7 +231,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
def _extend_network_dict(self, context, network): def _extend_network_dict(self, context, network):
if self._check_provider_view_auth(context, network): if self._check_provider_view_auth(context, network):
if not self.enable_tunneling: if not self.enable_tunneling:
network['provider:vlan_id'] = ovs_db_v2.get_vlan(network['id']) network['provider:vlan_id'] = ovs_db_v2.get_vlan(
network['id'], context.session)
def create_network(self, context, network): def create_network(self, context, network):
net = super(OVSQuantumPluginV2, self).create_network(context, network) net = super(OVSQuantumPluginV2, self).create_network(context, network)
@ -235,15 +240,15 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
vlan_id = network['network'].get('provider:vlan_id') vlan_id = network['network'].get('provider:vlan_id')
if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED): if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED):
self._enforce_provider_set_auth(context, net) self._enforce_provider_set_auth(context, net)
ovs_db_v2.reserve_specific_vlan_id(vlan_id) ovs_db_v2.reserve_specific_vlan_id(vlan_id, context.session)
else: else:
vlan_id = ovs_db_v2.reserve_vlan_id() vlan_id = ovs_db_v2.reserve_vlan_id(context.session)
except Exception: except Exception:
super(OVSQuantumPluginV2, self).delete_network(context, net['id']) super(OVSQuantumPluginV2, self).delete_network(context, net['id'])
raise raise
LOG.debug("Created network: %s" % net['id']) LOG.debug("Created network: %s" % net['id'])
ovs_db_v2.add_vlan_binding(vlan_id, str(net['id'])) ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']), context.session)
self._extend_network_dict(context, net) self._extend_network_dict(context, net)
return net return net

View File

@ -63,46 +63,50 @@ class OVSVlanIdsTest(unittest2.TestCase):
self.assertIsNone(ovs_db_v2.get_vlan_id(VLAN_MAX + 5 + 1)) self.assertIsNone(ovs_db_v2.get_vlan_id(VLAN_MAX + 5 + 1))
def test_vlan_id_pool(self): def test_vlan_id_pool(self):
session = db.get_session()
vlan_ids = set() vlan_ids = set()
for x in xrange(VLAN_MIN, VLAN_MAX + 1): for x in xrange(VLAN_MIN, VLAN_MAX + 1):
vlan_id = ovs_db_v2.reserve_vlan_id() vlan_id = ovs_db_v2.reserve_vlan_id(db.get_session())
self.assertGreaterEqual(vlan_id, VLAN_MIN) self.assertGreaterEqual(vlan_id, VLAN_MIN)
self.assertLessEqual(vlan_id, VLAN_MAX) self.assertLessEqual(vlan_id, VLAN_MAX)
vlan_ids.add(vlan_id) vlan_ids.add(vlan_id)
with self.assertRaises(q_exc.NoNetworkAvailable): with self.assertRaises(q_exc.NoNetworkAvailable):
vlan_id = ovs_db_v2.reserve_vlan_id() vlan_id = ovs_db_v2.reserve_vlan_id(session)
for vlan_id in vlan_ids: for vlan_id in vlan_ids:
ovs_db_v2.release_vlan_id(vlan_id) ovs_db_v2.release_vlan_id(vlan_id)
def test_invalid_specific_vlan_id(self): def test_invalid_specific_vlan_id(self):
session = db.get_session()
with self.assertRaises(q_exc.InvalidInput): with self.assertRaises(q_exc.InvalidInput):
vlan_id = ovs_db_v2.reserve_specific_vlan_id(0) vlan_id = ovs_db_v2.reserve_specific_vlan_id(0, session)
with self.assertRaises(q_exc.InvalidInput): with self.assertRaises(q_exc.InvalidInput):
vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095) vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095, session)
def test_specific_vlan_id_inside_pool(self): def test_specific_vlan_id_inside_pool(self):
session = db.get_session()
vlan_id = VLAN_MIN + 5 vlan_id = VLAN_MIN + 5
self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
ovs_db_v2.reserve_specific_vlan_id(vlan_id) ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
with self.assertRaises(q_exc.VlanIdInUse): with self.assertRaises(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan_id(vlan_id) ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
ovs_db_v2.release_vlan_id(vlan_id) ovs_db_v2.release_vlan_id(vlan_id)
self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
def test_specific_vlan_id_outside_pool(self): def test_specific_vlan_id_outside_pool(self):
session = db.get_session()
vlan_id = VLAN_MAX + 5 vlan_id = VLAN_MAX + 5
self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id))
ovs_db_v2.reserve_specific_vlan_id(vlan_id) ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
with self.assertRaises(q_exc.VlanIdInUse): with self.assertRaises(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan_id(vlan_id) ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
ovs_db_v2.release_vlan_id(vlan_id) ovs_db_v2.release_vlan_id(vlan_id)
self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id))

View File

@ -551,16 +551,17 @@ class JSONV2TestCase(APIv2TestBase):
self.assertEqual(res.status_int, 422) self.assertEqual(res.status_int, 422)
def test_create_bulk(self): def test_create_bulk(self):
data = {'networks': [{'name': 'net1', 'admin_state_up': True, data = {'networks': [{'name': 'net1',
'admin_state_up': True,
'tenant_id': _uuid()}, 'tenant_id': _uuid()},
{'name': 'net2', 'admin_state_up': True, {'name': 'net2',
'admin_state_up': True,
'tenant_id': _uuid()}]} 'tenant_id': _uuid()}]}
def side_effect(context, network): def side_effect(context, network):
nets = network.copy() net = network.copy()
for net in nets['networks']: net['network'].update({'subnets': []})
net.update({'subnets': []}) return net['network']
return nets
instance = self.plugin.return_value instance = self.plugin.return_value
instance.create_network.side_effect = side_effect instance.create_network.side_effect = side_effect
@ -904,7 +905,6 @@ class ExtensionTestCase(unittest.TestCase):
self.api = None self.api = None
self.plugin = None self.plugin = None
cfg.CONF.reset() cfg.CONF.reset()
# Restore the global RESOURCE_ATTRIBUTE_MAP # Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import contextlib import contextlib
import copy
import logging import logging
import mock import mock
import os import os
@ -36,6 +37,7 @@ from quantum.wsgi import Serializer, JSONDeserializer
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
ROOTDIR = os.path.dirname(os.path.dirname(__file__)) ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc') ETCDIR = os.path.join(ROOTDIR, 'etc')
@ -62,10 +64,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
'application/json': json_deserializer, 'application/json': json_deserializer,
} }
plugin = test_config.get('plugin_name_v2', plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
'quantum.db.db_base_plugin_v2.'
'QuantumDbPluginV2')
LOG.debug("db plugin test, the plugin is:%s", plugin)
# Create the default configurations # Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')] args = ['--config-file', etcdir('quantum.conf.test')]
config.parse(args=args) config.parse(args=args)
@ -74,6 +73,14 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
self.api = APIRouter() self.api = APIRouter()
def _is_native_bulk_supported():
plugin_obj = QuantumManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
def tearDown(self): def tearDown(self):
super(QuantumDbPluginV2TestCase, self).tearDown() super(QuantumDbPluginV2TestCase, self).tearDown()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
@ -118,6 +125,28 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
data = self._deserializers[ctype].deserialize(response.body)['body'] data = self._deserializers[ctype].deserialize(response.body)['body']
return data return data
def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
""" Creates a bulk request for any kind of resource """
objects = []
collection = "%ss" % resource
for i in range(0, number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "%s_%s" % (name, i)
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['quantum.context'] = kwargs['context']
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_status_up, **kwargs): def _create_network(self, fmt, name, admin_status_up, **kwargs):
data = {'network': {'name': name, data = {'network': {'name': name,
'admin_state_up': admin_status_up, 'admin_state_up': admin_status_up,
@ -134,6 +163,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
return network_req.get_response(self.api) return network_req.get_response(self.api)
def _create_network_bulk(self, fmt, number, name,
admin_status_up, **kwargs):
base_data = {'network': {'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
def _create_subnet(self, fmt, net_id, cidr, def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs): expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id, data = {'subnet': {'network_id': net_id,
@ -157,6 +192,19 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
self.assertEqual(subnet_res.status_int, expected_res_status) self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res return subnet_res
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(0, number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(0, number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs): def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
content_type = 'application/' + fmt content_type = 'application/' + fmt
data = {'port': {'network_id': net_id, data = {'port': {'network_id': net_id,
@ -196,6 +244,13 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
self.assertEqual(port_res.status_int, expected_res_status) self.assertEqual(port_res.status_int, expected_res_status)
return port_res return port_res
def _create_port_bulk(self, fmt, number, net_id, name,
admin_status_up, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
def _make_subnet(self, fmt, network, gateway, cidr, def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True): allocation_pools=None, ip_version=4, enable_dhcp=True):
res = self._create_subnet(fmt, res = self._create_subnet(fmt,
@ -220,6 +275,29 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
req = self.new_delete_request(collection, id) req = self.new_delete_request(collection, id)
req.get_response(self.api) req.get_response(self.api)
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
""" Invoked by test cases for injecting failures in plugin """
def second_call(*args, **kwargs):
raise Exception('boom')
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(self, res, collection):
self.assertEqual(res.status_int, 500)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
items = self.deserialize('json', res)
self.assertEqual(len(items[collection]), 0)
def _validate_behavior_on_bulk_success(self, res, collection,
names=['test_0', 'test_1']):
self.assertEqual(res.status_int, 201)
items = self.deserialize('json', res)[collection]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
@contextlib.contextmanager @contextlib.contextmanager
def network(self, name='net1', def network(self, name='net1',
admin_status_up=True, admin_status_up=True,
@ -429,6 +507,90 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(port['port'][k], v) self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port']) self.assertTrue('mac_address' in port['port'])
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
with self.network() as net:
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
overrides = {1: {'admin_state_up': 'doh'}}
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True,
override=overrides)
self.assertEqual(res.status_int, 400)
req = self.new_list_request('ports')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
ports = self.deserialize('json', res)
self.assertEqual(len(ports['ports']), 0)
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = QuantumManager.get_plugin().create_port
with mock.patch.object(QuantumManager.get_plugin(),
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk('json', 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'ports')
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
orig = QuantumManager._instance.plugin.create_port
with mock.patch.object(QuantumManager._instance.plugin,
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk('json', 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'ports')
def test_list_ports(self): def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2): with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json') req = self.new_list_request('ports', 'json')
@ -1061,6 +1223,77 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
network['network']['id']) network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409) self.assertEqual(req.get_response(self.api).status_int, 409)
def test_create_networks_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
res = self._create_network_bulk('json', 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
def test_create_networks_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
res = self._create_network_bulk('json', 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
def test_create_networks_bulk_wrong_input(self):
res = self._create_network_bulk('json', 2, 'test', True,
override={1:
{'admin_state_up': 'doh'}})
self.assertEqual(res.status_int, 400)
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEquals(res.status_int, 200)
nets = self.deserialize('json', res)
self.assertEqual(len(nets['networks']), 0)
def test_create_networks_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
orig = QuantumManager.get_plugin().create_network
#ensures the API choose the emulation code path
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with mock.patch.object(QuantumManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk('json', 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'networks')
def test_create_networks_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
orig = QuantumManager.get_plugin().create_network
with mock.patch.object(QuantumManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk('json', 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'networks')
def test_list_networks(self): def test_list_networks(self):
with self.network(name='net1') as net1: with self.network(name='net1') as net1:
with self.network(name='net2') as net2: with self.network(name='net2') as net2:
@ -1157,6 +1390,77 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
pass pass
self.assertEquals(ctx_manager.exception.code, 400) self.assertEquals(ctx_manager.exception.code, 400)
def test_create_subnets_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
with self.network() as net:
res = self._create_subnet_bulk('json', 2, net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
def test_create_subnets_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
def test_create_subnets_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = QuantumManager.get_plugin().create_subnet
with mock.patch.object(QuantumManager.get_plugin(),
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'subnets')
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
orig = QuantumManager._instance.plugin.create_subnet
with mock.patch.object(QuantumManager._instance.plugin,
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk('json', 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(res, 'subnets')
def test_delete_subnet(self): def test_delete_subnet(self):
gateway_ip = '10.0.0.1' gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24' cidr = '10.0.0.0/24'