Allow None for binding:profile attribute

We need to pass None in binding:profile to allow an administrator
to clear binding:profile attribute.

Closes-Bug: #1220011

Adds dedicated unit tests to the plugins which uses binding:profile
attribute (Mellanox and NEC plugins at now).

This commit also adds common unit tests for binding:profile to
the common PortBindingTestCase class.
- create_port with binding:profile whose value is None or {}
- update_port with binding:profile whose value is None or {}
- Reject binding:profile from non-admin user

Note that _make_port() in BigSwitch plugin test is updated
to allow passing arg_list() from the base test class.

Fix a bug in NEC plugin that 500 is returned when putting
binding:profile None to a port whose binding:profile is
already None (Closes-Bug: #1220720)

Change-Id: I146afe961cd445a023adc7233588d8034fdb8437
This commit is contained in:
Akihiro MOTOKI 2013-09-03 21:57:53 +09:00
parent cf353e24c8
commit a410ae39ec
6 changed files with 168 additions and 12 deletions

View File

@ -63,7 +63,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
PROFILE: {'allow_post': True, 'allow_put': True, PROFILE: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED, 'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True, 'enforce_policy': True,
'validate': {'type:dict': None}, 'validate': {'type:dict_or_none': None},
'is_visible': True}, 'is_visible': True},
CAPABILITIES: {'allow_post': False, 'allow_put': False, CAPABILITIES: {'allow_post': False, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED, 'default': attributes.ATTR_NOT_SPECIFIED,

View File

@ -430,6 +430,9 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
portinfo_changed = 'DEL' portinfo_changed = 'DEL'
portinfo = None portinfo = None
ndb.del_portinfo(context.session, port['id']) ndb.del_portinfo(context.session, port['id'])
else:
portinfo = None
portinfo_changed = None
self._extend_port_dict_binding_portinfo(port, portinfo) self._extend_port_dict_binding_portinfo(port, portinfo)
return portinfo_changed return portinfo_changed

View File

@ -48,6 +48,12 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
self.assertFalse(portbindings.VIF_TYPE in port) self.assertFalse(portbindings.VIF_TYPE in port)
self.assertFalse(portbindings.CAPABILITIES in port) self.assertFalse(portbindings.CAPABILITIES in port)
def _get_non_admin_context(self):
return context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
def test_port_vif_details(self): def test_port_vif_details(self):
with self.port(name='name') as port: with self.port(name='name') as port:
port_id = port['port']['id'] port_id = port['port']['id']
@ -58,10 +64,7 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
port = self._show('ports', port_id, neutron_context=ctx)['port'] port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbindings(port) self._check_response_portbindings(port)
# By default user is admin - now test non admin user # By default user is admin - now test non admin user
ctx = context.Context(user_id=None, ctx = self._get_non_admin_context()
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = self._show( non_admin_port = self._show(
'ports', port_id, neutron_context=ctx)['port'] 'ports', port_id, neutron_context=ctx)['port']
self._check_response_no_portbindings(non_admin_port) self._check_response_no_portbindings(non_admin_port)
@ -76,15 +79,81 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
for port in ports: for port in ports:
self._check_response_portbindings(port) self._check_response_portbindings(port)
# By default user is admin - now test non admin user # By default user is admin - now test non admin user
ctx = context.Context(user_id=None, ctx = self._get_non_admin_context()
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = self._list('ports', neutron_context=ctx)['ports'] ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(len(ports), 2) self.assertEqual(len(ports), 2)
for non_admin_port in ports: for non_admin_port in ports:
self._check_response_no_portbindings(non_admin_port) self._check_response_no_portbindings(non_admin_port)
def _check_default_port_binding_profile(self, port):
# For plugins which does not use binding:profile attr
# we just check an operation for the port succeed.
self.assertIn('id', port)
def _test_create_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_default_port_binding_profile(port['port'])
def test_create_port_binding_profile_none(self):
self._test_create_port_binding_profile(None)
def test_create_port_binding_profile_with_empty_dict(self):
self._test_create_port_binding_profile({})
def _test_update_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port() as port:
# print "(1) %s" % port
self._check_default_port_binding_profile(port['port'])
port_id = port['port']['id']
ctx = context.get_admin_context()
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self._check_default_port_binding_profile(port)
def test_update_port_binding_profile_none(self):
self._test_update_port_binding_profile(None)
def test_update_port_binding_profile_with_empty_dict(self):
self._test_update_port_binding_profile({})
def test_port_create_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
# succeed without binding:profile
with self.port(subnet=subnet1,
set_context=True, tenant_id='test'):
pass
# fail with binding:profile
try:
with self.port(subnet=subnet1,
expected_res_status=403,
arg_list=(portbindings.PROFILE,),
set_context=True, tenant_id='test',
**profile_arg):
pass
except exc.HTTPClientError:
pass
def test_port_update_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
with self.network() as net1:
with self.subnet(network=net1) as subnet1:
with self.port(subnet=subnet1) as port:
# By default user is admin - now test non admin user
# Note that 404 is returned when prohibit by policy.
# See comment for PolicyNotAuthorized except clause
# in update() in neutron.api.v2.base.Controller.
port_id = port['port']['id']
ctx = self._get_non_admin_context()
port = self._update('ports', port_id,
{'port': profile_arg},
expected_code=404,
neutron_context=ctx)
class PortBindingsHostTestCaseMixin(object): class PortBindingsHostTestCaseMixin(object):
fmt = 'json' fmt = 'json'

View File

@ -222,9 +222,12 @@ class TestBigSwitchVIFOverride(test_plugin.TestPortsV2,
res = self.deserialize(self.fmt, req.get_response(self.api)) res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['binding:vif_type'], self.VIF_TYPE) self.assertEqual(res['port']['binding:vif_type'], self.VIF_TYPE)
def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs): def _make_port(self, fmt, net_id, expected_res_status=None, arg_list=None,
**kwargs):
arg_list = arg_list or ()
arg_list += ('binding:host_id', )
res = self._create_port(fmt, net_id, expected_res_status, res = self._create_port(fmt, net_id, expected_res_status,
('binding:host_id', ), **kwargs) arg_list, **kwargs)
# Things can go wrong - raise HTTP exc with res code only # Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests # so it can be caught by unit tests
if res.status_int >= 400: if res.status_int >= 400:

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from webob import exc
from neutron.extensions import portbindings
from neutron.plugins.mlnx.common import constants from neutron.plugins.mlnx.common import constants
from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin from neutron.tests.unit import test_db_plugin as test_plugin
@ -53,6 +56,45 @@ class TestMlnxPortBinding(MlnxPluginV2TestCase,
VIF_TYPE = constants.VIF_TYPE_DIRECT VIF_TYPE = constants.VIF_TYPE_DIRECT
HAS_PORT_FILTER = False HAS_PORT_FILTER = False
def _check_default_port_binding_profole(self, port,
expected_vif_type=None):
if expected_vif_type is None:
expected_vif_type = constants.VIF_TYPE_DIRECT
p = port['port']
self.assertIn('id', p)
self.assertEqual(expected_vif_type, p[portbindings.VIF_TYPE])
self.assertEqual({'physical_network': 'default'},
p[portbindings.PROFILE])
def test_create_port_no_binding_profile(self):
with self.port() as port:
self._check_default_port_binding_profole(port)
def test_create_port_binding_profile_none(self):
profile_arg = {portbindings.PROFILE: None}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_default_port_binding_profole(port)
def test_create_port_binding_profile_vif_type(self):
for vif_type in [constants.VIF_TYPE_HOSTDEV,
constants.VIF_TYPE_DIRECT]:
profile_arg = {portbindings.PROFILE:
{constants.VNIC_TYPE: vif_type}}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_default_port_binding_profole(
port, expected_vif_type=vif_type)
def test_create_port_binding_profile_with_empty_dict(self):
profile_arg = {portbindings.PROFILE: {}}
try:
with self.port(arg_list=(portbindings.PROFILE,),
expected_res_status=400, **profile_arg):
pass
except exc.HTTPClientError:
pass
class TestMlnxPortBindingNoSG(TestMlnxPortBinding): class TestMlnxPortBindingNoSG(TestMlnxPortBinding):
HAS_PORT_FILTER = False HAS_PORT_FILTER = False

View File

@ -149,13 +149,34 @@ class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
self.assertEqual(self.ofc.create_ofc_port.call_count, 2) self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1) self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
# delete portinfo # delete portinfo with an empty dict
profile_arg = {portbindings.PROFILE: {}} profile_arg = {portbindings.PROFILE: {}}
port = self._update('ports', port_id, {'port': profile_arg}, port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port'] neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 2) self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 2) self.assertEqual(self.ofc.delete_ofc_port.call_count, 2)
def test_port_update_portinfo_detail_clear_with_none(self):
with self.port() as port:
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
port_id = port['port']['id']
ctx = context.get_admin_context()
# add portinfo
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
# delete portinfo with None
profile_arg = {portbindings.PROFILE: None}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
def test_port_create_portinfo_with_empty_dict(self): def test_port_create_portinfo_with_empty_dict(self):
profile_arg = {portbindings.PROFILE: {}} profile_arg = {portbindings.PROFILE: {}}
with self.port(arg_list=(portbindings.PROFILE,), with self.port(arg_list=(portbindings.PROFILE,),
@ -174,6 +195,24 @@ class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
self.assertEqual(self.ofc.create_ofc_port.call_count, 1) self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0) self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
def test_port_create_portinfo_with_none(self):
profile_arg = {portbindings.PROFILE: None}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbinding_no_profile(port['port'])
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
# add portinfo
ctx = context.get_admin_context()
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self._check_response_portbinding_profile(port)
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
def test_port_create_portinfo_non_admin(self): def test_port_create_portinfo_non_admin(self):
with self.network(set_context=True, tenant_id='test') as net1: with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1: with self.subnet(network=net1) as subnet1: