Export portinfo thru portbinding ext in NEC plugin

blueprint nec-port-binding

* Add host-id support in port-binding extension.
* Expose portinfo thourgh binding:profile attr in a port.
  portinfo is a mapping between neutron port id and OpenFlow switch
  physical information (datapath_id and port_no)

It changes the following in portinfo db model
* Add cascade on delete to delete an associated portinfo
  when deleting the port.
* Use joined query for portinfo model to retrieve an associated
  portinfo when querying a port.

Change-Id: Id88d93dc0770a1290714436324b1b53c0b023eeb
This commit is contained in:
Akihiro MOTOKI 2013-08-21 15:43:19 +09:00
parent b944ed485c
commit ca30f7712a
8 changed files with 605 additions and 97 deletions

View File

@ -0,0 +1,65 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""NEC Port Binding
Revision ID: 2a3bae1ceb8
Revises: 46a0efbd8f0
Create Date: 2013-08-22 11:09:19.955386
"""
# revision identifiers, used by Alembic.
revision = '2a3bae1ceb8'
down_revision = '46a0efbd8f0'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.plugins.nec.nec_plugin.NECPluginV2'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
'portbindingports',
sa.Column('port_id', sa.String(length=36), nullable=False),
sa.Column('host', sa.String(length=255), nullable=False),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('port_id')
)
op.create_foreign_key(
'portinfos_ibfk_1',
source='portinfos', referent='ports',
local_cols=['id'], remote_cols=['id'],
ondelete='CASCADE')
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_constraint('portinfos_ibfk_1', 'portinfos', 'foreignkey')
op.drop_table('portbindingports')

View File

@ -33,3 +33,14 @@ class OFCConsistencyBroken(qexc.NeutronException):
class PortInfoNotFound(qexc.NotFound):
message = _("PortInfo %(id)s could not be found")
class ProfilePortInfoInvalidDataPathId(qexc.InvalidInput):
message = _('Invalid input for operation: '
'portinfo:datapath_id should be a hex string '
'with at most 8 bytes')
class ProfilePortInfoInvalidPortNo(qexc.InvalidInput):
message = _('Invalid input for operation: '
'portinfo:port_no should be [0:65535]')

View File

@ -16,6 +16,7 @@
# @author: Ryota MIBU
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.db import model_base
from neutron.db import models_v2
@ -74,9 +75,17 @@ class OFCFilter(model_base.BASEV2, models_v2.HasId, HasNeutronId):
"""Represents a Filter on OpenFlow Network/Controller."""
class PortInfo(model_base.BASEV2, models_v2.HasId):
class PortInfo(model_base.BASEV2):
"""Represents a Virtual Interface."""
id = sa.Column(sa.String(36),
sa.ForeignKey('ports.id', ondelete="CASCADE"),
primary_key=True)
datapath_id = sa.Column(sa.String(36), nullable=False)
port_no = sa.Column(sa.Integer, nullable=False)
vlan_id = sa.Column(sa.Integer, nullable=False)
mac = sa.Column(sa.String(32), nullable=False)
port = orm.relationship(
models_v2.Port,
backref=orm.backref("portinfo",
lazy='joined', uselist=False,
cascade='delete'))

View File

@ -19,6 +19,7 @@
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
@ -31,6 +32,7 @@ from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
@ -55,7 +57,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
packet_filter.PacketFilterMixin,
portbindings_base.PortBindingBaseMixin):
portbindings_db.PortBindingMixin):
"""NECPluginV2 controls an OpenFlow Controller.
The Neutron NECPluginV2 maps L2 logical networks to L2 virtualized networks
@ -342,6 +344,111 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
'security-group' in self.supported_extension_aliases}}
return binding
def _extend_port_dict_binding_portinfo(self, port_res, portinfo):
if portinfo:
port_res[portbindings.PROFILE] = {
'portinfo:datapath_id': portinfo['datapath_id'],
'portinfo:port_no': portinfo['port_no'],
}
elif portbindings.PROFILE in port_res:
del port_res[portbindings.PROFILE]
def _validate_portinfo(self, profile):
key_specs = {
'portinfo:datapath_id': {'type:string': None, 'required': True},
'portinfo:port_no': {'type:non_negative': None, 'required': True,
'convert_to': attrs.convert_to_int}
}
msg = attrs._validate_dict_or_empty(profile, key_specs=key_specs)
if msg:
raise q_exc.InvalidInput(error_message=msg)
datapath_id = profile.get('portinfo:datapath_id')
port_no = profile.get('portinfo:port_no')
try:
dpid = int(datapath_id, 16)
except ValueError:
raise nexc.ProfilePortInfoInvalidDataPathId()
if dpid > 0xffffffffffffffffL:
raise nexc.ProfilePortInfoInvalidDataPathId()
# Make sure dpid is a hex string beginning with 0x.
dpid = hex(dpid)
if int(port_no) > 65535:
raise nexc.ProfilePortInfoInvalidPortNo()
return {'datapath_id': dpid, 'port_no': port_no}
def _process_portbindings_portinfo_create(self, context, port_data, port):
"""Add portinfo according to bindings:profile in create_port().
:param context: neutron api request context
:param port_data: port attributes passed in PUT request
:param port: port attributes to be returned
"""
profile = port_data.get(portbindings.PROFILE)
# If portbindings.PROFILE is None, unspecified or an empty dict
# it is regarded that portbinding.PROFILE is not set.
profile_set = attrs.is_attr_set(profile) and profile
if profile_set:
portinfo = self._validate_portinfo(profile)
portinfo['mac'] = port['mac_address']
ndb.add_portinfo(context.session, port['id'], **portinfo)
else:
portinfo = None
self._extend_port_dict_binding_portinfo(port, portinfo)
def _process_portbindings_portinfo_update(self, context, port_data, port):
"""Update portinfo according to bindings:profile in update_port().
:param context: neutron api request context
:param port_data: port attributes passed in PUT request
:param port: port attributes to be returned
:returns: 'ADD', 'MOD', 'DEL' or None
"""
if portbindings.PROFILE not in port_data:
return
profile = port_data.get(portbindings.PROFILE)
# If binding:profile is None or an empty dict,
# it means binding:.profile needs to be cleared.
# TODO(amotoki): Allow Make None in binding:profile in
# the API layer. See LP bug #1220011.
profile_set = attrs.is_attr_set(profile) and profile
cur_portinfo = ndb.get_portinfo(context.session, port['id'])
if profile_set:
portinfo = self._validate_portinfo(profile)
portinfo_changed = 'ADD'
if cur_portinfo:
if (portinfo['datapath_id'] == cur_portinfo.datapath_id and
portinfo['port_no'] == cur_portinfo.port_no):
return
ndb.del_portinfo(context.session, port['id'])
portinfo_changed = 'MOD'
portinfo['mac'] = port['mac_address']
ndb.add_portinfo(context.session, port['id'], **portinfo)
elif cur_portinfo:
portinfo_changed = 'DEL'
portinfo = None
ndb.del_portinfo(context.session, port['id'])
self._extend_port_dict_binding_portinfo(port, portinfo)
return portinfo_changed
def extend_port_dict_binding(self, port_res, port_db):
super(NECPluginV2, self).extend_port_dict_binding(port_res, port_db)
self._extend_port_dict_binding_portinfo(port_res, port_db.portinfo)
def _process_portbindings_create(self, context, port_data, port):
super(NECPluginV2, self)._process_portbindings_create_and_update(
context, port_data, port)
self._process_portbindings_portinfo_create(context, port_data, port)
def _process_portbindings_update(self, context, port_data, port):
super(NECPluginV2, self)._process_portbindings_create_and_update(
context, port_data, port)
portinfo_changed = self._process_portbindings_portinfo_update(
context, port_data, port)
return portinfo_changed
def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
@ -353,15 +460,50 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
port = super(NECPluginV2, self).create_port(context, port)
self._process_portbindings_create_and_update(context,
port_data,
port)
self._process_portbindings_create(context, port_data, port)
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
return self.activate_port_if_ready(context, port)
def _update_ofc_port_if_required(self, context, old_port, new_port,
portinfo_changed):
def get_ofport_exist(port):
return (port['admin_state_up'] and
bool(port.get(portbindings.PROFILE)))
# Determine it is required to update OFC port
need_add = False
need_del = False
need_packet_filter_update = False
old_ofport_exist = get_ofport_exist(old_port)
new_ofport_exist = get_ofport_exist(new_port)
if old_port['admin_state_up'] != new_port['admin_state_up']:
if new_port['admin_state_up']:
need_add |= new_ofport_exist
else:
need_del |= old_ofport_exist
if portinfo_changed:
if portinfo_changed in ['DEL', 'MOD']:
need_del |= old_ofport_exist
if portinfo_changed in ['ADD', 'MOD']:
need_add |= new_ofport_exist
need_packet_filter_update |= True
# Update OFC port if required
if need_del:
self.deactivate_port(context, new_port)
if need_packet_filter_update:
self.deactivate_packet_filters_by_port(context, id)
if need_add:
if need_packet_filter_update:
self.activate_packet_filters_by_port(context, id)
self.activate_port_if_ready(context, new_port)
def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port.
@ -375,9 +517,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
self._process_portbindings_create_and_update(context,
port['port'],
new_port)
portinfo_changed = self._process_portbindings_update(
context, port['port'], new_port)
need_port_update_notify = self.update_security_group_on_port(
context, id, port, old_port, new_port)
@ -386,13 +527,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
if need_port_update_notify:
self.notifier.port_update(context, new_port)
changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
if changed:
if new_port['admin_state_up']:
new_port = self.activate_port_if_ready(context, new_port)
else:
new_port = self.deactivate_port(context, new_port)
self._update_ofc_port_if_required(context, old_port, new_port,
portinfo_changed)
return new_port
def delete_port(self, context, id, l3_port_check=True):
@ -510,10 +646,10 @@ class NECPluginV2RPCCallbacks(object):
"port_added message (port_id=%s)."), id)
continue
ndb.del_portinfo(session, id)
ndb.add_portinfo(session, id, datapath_id, p['port_no'],
mac=p.get('mac', ''))
port = self._get_port(rpc_context, id)
if port:
ndb.add_portinfo(session, id, datapath_id, p['port_no'],
mac=p.get('mac', ''))
# NOTE: Make sure that packet filters on this port exist while
# the port is active to avoid unexpected packet transfer.
if portinfo:

View File

@ -15,26 +15,24 @@
# under the License.
# @author: Ryota MIBU
import contextlib
import random
from neutron.common import constants as q_const
from neutron.db import api as db_api
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.db import models as nmodels # noqa
from neutron.tests import base
from neutron.tests.unit.nec import test_nec_plugin
class NECPluginV2DBTestBase(base.BaseTestCase):
class NECPluginV2DBTestBase(test_nec_plugin.NecPluginV2TestCase):
"""Class conisting of NECPluginV2 DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(NECPluginV2DBTestBase, self).setUp()
ndb.initialize()
self.session = db_api.get_session()
self.addCleanup(ndb.clear_db)
self.session = self.context.session
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test."""
@ -43,15 +41,18 @@ class NECPluginV2DBTestBase(base.BaseTestCase):
none = uuidutils.generate_uuid()
return ofc_id, neutron_id, none
def get_portinfo_random_params(self):
"""create random parameters for portinfo test."""
port_id = uuidutils.generate_uuid()
datapath_id = hex(random.randint(0, 0xffffffff))
port_no = random.randint(1, 100)
vlan_id = random.randint(q_const.MIN_VLAN_TAG, q_const.MAX_VLAN_TAG)
mac = ':'.join(["%02x" % random.randint(0, 0xff) for x in range(6)])
none = uuidutils.generate_uuid()
return port_id, datapath_id, port_no, vlan_id, mac, none
@contextlib.contextmanager
def portinfo_random_params(self):
with self.port() as port:
params = {'port_id': port['port']['id'],
'datapath_id': hex(random.randint(0, 0xffffffff)),
'port_no': random.randint(1, 100),
'vlan_id': random.randint(q_const.MIN_VLAN_TAG,
q_const.MAX_VLAN_TAG),
'mac': ':'.join(["%02x" % random.randint(0, 0xff)
for x in range(6)])
}
yield params
class NECPluginV2DBTest(NECPluginV2DBTestBase):
@ -122,45 +123,50 @@ class NECPluginV2DBTest(NECPluginV2DBTestBase):
'ofc_tenant', o)
self.assertEqual(None, tenant_none)
def _compare_portinfo(self, portinfo, expected):
self.assertEqual(portinfo.id, expected['port_id'])
self.assertEqual(portinfo.datapath_id, expected['datapath_id'])
self.assertEqual(portinfo.port_no, expected['port_no'])
self.assertEqual(portinfo.vlan_id, expected['vlan_id'])
self.assertEqual(portinfo.mac, expected['mac'])
def _add_portinfo(self, session, params):
return ndb.add_portinfo(session, params['port_id'],
params['datapath_id'], params['port_no'],
params['vlan_id'], params['mac'])
def testd_add_portinfo(self):
"""test add portinfo."""
i, d, p, v, m, n = self.get_portinfo_random_params()
portinfo = ndb.add_portinfo(self.session, i, d, p, v, m)
self.assertEqual(portinfo.id, i)
self.assertEqual(portinfo.datapath_id, d)
self.assertEqual(portinfo.port_no, p)
self.assertEqual(portinfo.vlan_id, v)
self.assertEqual(portinfo.mac, m)
with self.portinfo_random_params() as params:
portinfo = self._add_portinfo(self.session, params)
self._compare_portinfo(portinfo, params)
exception_raised = False
try:
ndb.add_portinfo(self.session, i, d, p, v, m)
self._add_portinfo(self.session, params)
except nexc.NECDBException:
exception_raised = True
self.assertTrue(exception_raised)
def teste_get_portinfo(self):
"""test get portinfo."""
i, d, p, v, m, n = self.get_portinfo_random_params()
ndb.add_portinfo(self.session, i, d, p, v, m)
portinfo = ndb.get_portinfo(self.session, i)
self.assertEqual(portinfo.id, i)
self.assertEqual(portinfo.datapath_id, d)
self.assertEqual(portinfo.port_no, p)
self.assertEqual(portinfo.vlan_id, v)
self.assertEqual(portinfo.mac, m)
with self.portinfo_random_params() as params:
self._add_portinfo(self.session, params)
portinfo = ndb.get_portinfo(self.session, params['port_id'])
self._compare_portinfo(portinfo, params)
portinfo_none = ndb.get_portinfo(self.session, n)
nonexist_id = uuidutils.generate_uuid()
portinfo_none = ndb.get_portinfo(self.session, nonexist_id)
self.assertEqual(None, portinfo_none)
def testf_del_portinfo(self):
"""test delete portinfo."""
i, d, p, v, m, n = self.get_portinfo_random_params()
ndb.add_portinfo(self.session, i, d, p, v, m)
portinfo = ndb.get_portinfo(self.session, i)
self.assertEqual(portinfo.id, i)
ndb.del_portinfo(self.session, i)
portinfo_none = ndb.get_portinfo(self.session, i)
with self.portinfo_random_params() as params:
self._add_portinfo(self.session, params)
portinfo = ndb.get_portinfo(self.session, params['port_id'])
self.assertEqual(portinfo.id, params['port_id'])
ndb.del_portinfo(self.session, params['port_id'])
portinfo_none = ndb.get_portinfo(self.session, params['port_id'])
self.assertEqual(None, portinfo_none)

View File

@ -24,15 +24,12 @@ from neutron.common.test_lib import test_config
from neutron.common import topics
from neutron import context
from neutron.db import db_base_plugin_v2
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec import nec_plugin
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nec import fake_ofc_manager
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = 'neutron.plugins.nec.nec_plugin.NECPluginV2'
@ -101,31 +98,13 @@ class TestNecV2HTTPResponse(test_plugin.TestV2HTTPResponse,
class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
pass
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
pass
class TestNecPortBinding(test_bindings.PortBindingsTestCase,
NecPluginV2TestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
def setUp(self):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
super(TestNecPortBinding, self).setUp()
class TestNecPortBindingNoSG(TestNecPortBinding):
HAS_PORT_FILTER = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
class TestNecPortsV2Callback(NecPluginV2TestCase):
def _get_portinfo(self, port_id):
@ -177,10 +156,10 @@ class TestNecPortsV2Callback(NecPluginV2TestCase):
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNone(self._get_portinfo(port_id))
# The port is expected to delete when exiting with-clause.
if not portinfo_delete_first:
# The port and portinfo is expected to delete when exiting with-clause.
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNotNone(self._get_portinfo(port_id))
self.assertIsNone(self._get_portinfo(port_id))
if not portinfo_delete_first:
self.rpcapi_update_ports(removed=[port_id])
# Ensure port deletion is called once.
@ -198,7 +177,7 @@ class TestNecPortsV2Callback(NecPluginV2TestCase):
def test_portinfo_added_unknown_port(self):
portinfo = {'id': 'dummy-p1', 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
self.assertIsNotNone(ndb.get_portinfo(self.context.session,
self.assertIsNone(ndb.get_portinfo(self.context.session,
'dummy-p1'))
self.assertEqual(self.ofc.exists_ofc_port.call_count, 0)
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
@ -234,8 +213,7 @@ class TestNecPortsV2Callback(NecPluginV2TestCase):
# No OFC operations are expected.
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(ndb.get_portinfo(self.context.session,
port_id).port_no, 456)
self.assertIsNone(ndb.get_portinfo(self.context.session, port_id))
def test_portinfo_change(self):
self._test_portinfo_change()

View File

@ -15,6 +15,8 @@
# under the License.
# @author: Ryota MIBU
import mock
from neutron import context
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import config
@ -24,6 +26,19 @@ from neutron.plugins.nec import ofc_manager
from neutron.tests import base
class FakePortInfo(object):
def __init__(self, id, datapath_id, port_no=0,
vlan_id=65535, mac='00:11:22:33:44:55'):
self.data = {'id': id, 'datapath_id': datapath_id,
'port_no': port_no, 'vlan_id': vlan_id, 'mac': mac}
def __getattr__(self, name):
if name in self.fields:
return self[name]
else:
raise AttributeError(name)
class OFCManagerTestBase(base.BaseTestCase):
"""Class conisting of OFCManager unit tests."""
@ -35,6 +50,7 @@ class OFCManagerTestBase(base.BaseTestCase):
self.addCleanup(ndb.clear_db)
self.ofc = ofc_manager.OFCManager()
self.ctx = context.get_admin_context()
self.addCleanup(mock.patch.stopall)
def get_random_params(self):
"""create random parameters for portinfo test."""
@ -98,44 +114,51 @@ class OFCManagerTest(OFCManagerTestBase):
self.ofc.delete_ofc_network(self.ctx, n, {'tenant_id': t})
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_network', n))
def _mock_get_portinfo(self, port_id, datapath_id='0xabc', port_no=1):
get_portinfo = mock.patch.object(ndb, 'get_portinfo').start()
fake_portinfo = FakePortInfo(id=port_id, datapath_id=datapath_id,
port_no=port_no)
get_portinfo.return_value = fake_portinfo
return get_portinfo
def testg_create_ofc_port(self):
"""test create ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
ndb.add_portinfo(self.ctx.session, p, "0xabc", 1, 65535,
"00:11:22:33:44:55")
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
port = ndb.get_ofc_item(self.ctx.session, 'ofc_port', p)
self.assertEqual(port.ofc_id, "ofc-" + p[:-4])
get_portinfo.assert_called_once_with(mock.ANY, p)
def testh_exists_ofc_port(self):
"""test exists_ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
ndb.add_portinfo(self.ctx.session, p, "0xabc", 2, 65535,
"00:12:22:33:44:55")
self.assertFalse(self.ofc.exists_ofc_port(self.ctx, p))
get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(self.ofc.exists_ofc_port(self.ctx, p))
get_portinfo.assert_called_once_with(mock.ANY, p)
def testi_delete_ofc_port(self):
"""test delete ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
ndb.add_portinfo(self.ctx.session, p, "0xabc", 3, 65535,
"00:13:22:33:44:55")
get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
self.ofc.delete_ofc_port(self.ctx, p, port)
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
get_portinfo.assert_called_once_with(mock.ANY, p)
def testj_create_ofc_packet_filter(self):
"""test create ofc_filter."""

View File

@ -0,0 +1,280 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 NEC Corporation
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Akihiro Motoki, NEC Corporation
from testtools import matchers
from webob import exc
from neutron.common import exceptions as q_exc
from neutron import context
from neutron.extensions import portbindings
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nec import test_nec_plugin
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
class TestNecPortBinding(test_bindings.PortBindingsTestCase,
test_nec_plugin.NecPluginV2TestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
def setUp(self):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
super(TestNecPortBinding, self).setUp()
class TestNecPortBindingNoSG(TestNecPortBinding):
HAS_PORT_FILTER = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
class TestNecPortBindingHost(
test_bindings.PortBindingsHostTestCaseMixin,
test_nec_plugin.NecPluginV2TestCase):
pass
class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
def _get_portinfo(self, datapath_id=None, port_no=None, prefix=None):
if datapath_id is None:
datapath_id = '0xabc'
if port_no is None:
port_no = 1
if prefix is None:
prefix = 'portinfo:'
return {prefix + 'datapath_id': datapath_id,
prefix + 'port_no': port_no}
def _check_response_portbinding_profile(self, port, datapath_id=None,
port_no=None):
expected = self._get_portinfo(datapath_id, port_no, prefix='')
profile = port[portbindings.PROFILE]
self.assertEqual(len(profile), 2)
self.assertEqual(profile['portinfo:datapath_id'],
expected['datapath_id'])
self.assertEqual(profile['portinfo:port_no'],
expected['port_no'])
def _check_response_portbinding_no_profile(self, port):
self.assertIn('status', port)
self.assertNotIn(portbindings.PROFILE, port)
def _get_non_admin_context(self):
return context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
def test_port_create_portinfo(self):
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbinding_profile(port['port'])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
# Check a response of get_port
ctx = context.get_admin_context()
port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbinding_profile(port)
# By default user is admin - now test non admin user
ctx = self._get_non_admin_context()
non_admin_port = self._show(
'ports', port_id, neutron_context=ctx)['port']
self._check_response_portbinding_no_profile(non_admin_port)
# port-update with non admin user should fail
self._update('ports', port_id,
{'port': profile_arg},
expected_code=404,
neutron_context=ctx)
def test_port_update_portinfo(self):
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
with self.port() as port:
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbinding_no_profile(port['port'])
# Check a response of update_port
ctx = context.get_admin_context()
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self._check_response_portbinding_profile(port)
port = self._show('ports', port_id, neutron_context=ctx)['port']
self._check_response_portbinding_profile(port)
def test_port_update_portinfo_detail(self):
with self.port() as port:
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
port_id = port['port']['id']
ctx = context.get_admin_context()
# add portinfo
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
# portinfo unchanged
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
# modify portinfo
profile_arg = {portbindings.PROFILE:
self._get_portinfo(datapath_id='0x1234567890',
port_no=99)}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
# delete portinfo
profile_arg = {portbindings.PROFILE: {}}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 2)
def test_port_create_portinfo_with_empty_dict(self):
profile_arg = {portbindings.PROFILE: {}}
with self.port(arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbinding_no_profile(port['port'])
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
# add portinfo
ctx = context.get_admin_context()
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
port = self._update('ports', port_id, {'port': profile_arg},
neutron_context=ctx)['port']
self._check_response_portbinding_profile(port)
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
def test_port_create_portinfo_non_admin(self):
with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
try:
with self.port(subnet=subnet1,
expected_res_status=403,
arg_list=(portbindings.PROFILE,),
set_context=True, tenant_id='test',
**profile_arg):
pass
except exc.HTTPClientError:
pass
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
def test_port_update_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: self._get_portinfo()}
with self.network() as net1:
with self.subnet(network=net1) as subnet1:
with self.port(subnet=subnet1) as port:
# By default user is admin - now test non admin user
# Note that 404 is returned when prohibit by policy.
# See comment for PolicyNotAuthorized except clause
# in update() in neutron.api.v2.base.Controller.
port_id = port['port']['id']
ctx = self._get_non_admin_context()
port = self._update('ports', port_id,
{'port': profile_arg},
expected_code=404,
neutron_context=ctx)
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
def test_port_create_portinfo_validation_called(self):
# Check validate_portinfo is called.
profile_arg = {portbindings.PROFILE:
{'portinfo:datapath_id': '0xabc',
'portinfo:port_no': 0xffff + 1}}
try:
with self.port(arg_list=(portbindings.PROFILE,),
expected_res_status=400,
**profile_arg):
pass
except exc.HTTPClientError:
pass
class TestNecPortBindingValidatePortInfo(test_nec_plugin.NecPluginV2TestCase):
def test_validate_portinfo_ok(self):
profile = {'portinfo:datapath_id': '0x1234567890abcdef',
'portinfo:port_no': 123}
portinfo = self.plugin._validate_portinfo(profile)
self.assertEqual(portinfo['datapath_id'], '0x1234567890abcdef')
self.assertEqual(portinfo['port_no'], 123)
def test_validate_portinfo_ok_without_0x(self):
profile = {'portinfo:datapath_id': '1234567890abcdef',
'portinfo:port_no': 123}
portinfo = self.plugin._validate_portinfo(profile)
self.assertEqual(portinfo['datapath_id'], '0x1234567890abcdef')
self.assertEqual(portinfo['port_no'], 123)
def _test_validate_exception(self, profile, expected_msg):
e = self.assertRaises(q_exc.InvalidInput,
self.plugin._validate_portinfo, profile)
self.assertThat(str(e), matchers.StartsWith(expected_msg))
def test_validate_portinfo_dict_validation(self):
expected_msg = ("Invalid input for operation: "
"Validation of dictionary's keys failed.")
profile = {'portinfo:port_no': 123}
self._test_validate_exception(profile, expected_msg)
profile = {'portinfo:datapath_id': '0xabcdef'}
self._test_validate_exception(profile, expected_msg)
def test_validate_portinfo_negative_port_number(self):
profile = {'portinfo:datapath_id': '0x1234567890abcdef',
'portinfo:port_no': -1}
expected_msg = ("Invalid input for operation: "
"'-1' should be non-negative.")
self._test_validate_exception(profile, expected_msg)
def test_validate_portinfo_invalid_datapath_id(self):
expected_msg = ("Invalid input for operation: "
"portinfo:datapath_id should be a hex string")
# non hexidecimal datapath_id
profile = {'portinfo:datapath_id': 'INVALID',
'portinfo:port_no': 123}
self._test_validate_exception(profile, expected_msg)
# Too big datapath_id
profile = {'portinfo:datapath_id': '0x10000000000000000',
'portinfo:port_no': 123}
self._test_validate_exception(profile, expected_msg)
def test_validate_portinfo_too_big_port_number(self):
profile = {'portinfo:datapath_id': '0x1234567890abcdef',
'portinfo:port_no': 65536}
expected_msg = ("Invalid input for operation: "
"portinfo:port_no should be [0:65535]")
self._test_validate_exception(profile, expected_msg)