diff --git a/neutron/plugins/ml2/drivers/cisco/exceptions.py b/neutron/plugins/ml2/drivers/cisco/exceptions.py index c431f9b2d6..c119cf96d4 100644 --- a/neutron/plugins/ml2/drivers/cisco/exceptions.py +++ b/neutron/plugins/ml2/drivers/cisco/exceptions.py @@ -29,8 +29,8 @@ class CredentialNameNotFound(exceptions.NeutronException): class CredentialAlreadyExists(exceptions.NeutronException): - """Credential ID already exists.""" - message = _("Credential %(credential_id)s already exists " + """Credential name already exists.""" + message = _("Credential %(credential_name)s already exists " "for tenant %(tenant_id)s.") diff --git a/neutron/tests/unit/ml2/drivers/test_cisco_network_db.py b/neutron/tests/unit/ml2/drivers/test_cisco_network_db.py new file mode 100644 index 0000000000..849e7fe63f --- /dev/null +++ b/neutron/tests/unit/ml2/drivers/test_cisco_network_db.py @@ -0,0 +1,167 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import testtools + +from neutron.db import api as db +from neutron.plugins.ml2.drivers.cisco import exceptions +from neutron.plugins.ml2.drivers.cisco import network_db_v2 +from neutron.tests import base + + +class CiscoNetworkCredentialDbTest(base.BaseTestCase): + + """Unit tests for Cisco ML2 mechanism driver credentials database.""" + + CredObj = collections.namedtuple('CredObj', + 'tenant_id cred_name user_name pwd') + + def setUp(self): + super(CiscoNetworkCredentialDbTest, self).setUp() + db.configure_db() + self.addCleanup(db.clear_db) + + def _cred_test_obj(self, tenant_num, cred_num): + """Create a Credential test object from a pair of numbers.""" + tenant_id = 'tenant_%s' % tenant_num + cred_name = 'credential_%s_%s' % (tenant_num, cred_num) + user_name = 'user_%s_%s' % (tenant_num, cred_num) + pwd = 'password_%s_%s' % (tenant_num, cred_num) + return self.CredObj(tenant_id, cred_name, user_name, pwd) + + def _assert_cred_equal(self, db_cred, test_cred): + """Assert that a database credential matches a credential test obj.""" + self.assertEqual(db_cred.tenant_id, test_cred.tenant_id) + self.assertEqual(db_cred.credential_name, test_cred.cred_name) + self.assertEqual(db_cred.user_name, test_cred.user_name) + self.assertEqual(db_cred.password, test_cred.pwd) + + def _get_credential(self, db_cred): + """Lists credentials that match a credential's tenant and cred IDs.""" + return network_db_v2.get_credential(db_cred.tenant_id, + db_cred.credential_id) + + def _get_credential_name(self, db_cred): + """Lists credentials that match a cred's tenant ID and cred name.""" + return network_db_v2.get_credential_name(db_cred.tenant_id, + db_cred.credential_name) + + def _add_credential(self, test_cred): + """Adds a credential to the database.""" + return network_db_v2.add_credential(test_cred.tenant_id, + test_cred.cred_name, + test_cred.user_name, + test_cred.pwd) + + def _remove_credential(self, db_cred): + """Removes a credential from the database.""" + return network_db_v2.remove_credential(db_cred.tenant_id, + db_cred.credential_id) + + def _update_credential(self, db_cred, new_user_name=None, + new_password=None): + """Updates a credential with a new user name and password.""" + return network_db_v2.update_credential(db_cred.tenant_id, + db_cred.credential_id, + new_user_name, + new_password) + + def test_credential_add_remove(self): + """Tests add and removal of credential to/from the database.""" + cred11 = self._cred_test_obj(1, 1) + cred = self._add_credential(cred11) + self._assert_cred_equal(cred, cred11) + cred = self._remove_credential(cred) + self._assert_cred_equal(cred, cred11) + cred = self._remove_credential(cred) + self.assertIsNone(cred) + + def test_credential_add_dup(self): + """Tests addition of a duplicate credential to the database.""" + cred22 = self._cred_test_obj(2, 2) + cred = self._add_credential(cred22) + self._assert_cred_equal(cred, cred22) + with testtools.ExpectedException(exceptions.CredentialAlreadyExists): + self._add_credential(cred22) + cred = self._remove_credential(cred) + self._assert_cred_equal(cred, cred22) + cred = self._remove_credential(cred) + self.assertIsNone(cred) + + def test_credential_get(self): + """Tests get of credentials by tenant ID and credential ID.""" + cred11 = self._cred_test_obj(1, 1) + cred11_db = self._add_credential(cred11) + cred21 = self._cred_test_obj(2, 1) + cred21_db = self._add_credential(cred21) + cred22 = self._cred_test_obj(2, 2) + cred22_db = self._add_credential(cred22) + + cred = self._get_credential(cred11_db) + self._assert_cred_equal(cred, cred11) + cred = self._get_credential(cred21_db) + self._assert_cred_equal(cred, cred21) + cred = self._get_credential(cred22_db) + self._assert_cred_equal(cred, cred22) + + with testtools.ExpectedException(exceptions.CredentialNotFound): + network_db_v2.get_credential("dummyTenantId", "dummyCredentialId") + + cred_all_t1 = network_db_v2.get_all_credentials(cred11.tenant_id) + self.assertEqual(len(cred_all_t1), 1) + cred_all_t2 = network_db_v2.get_all_credentials(cred21.tenant_id) + self.assertEqual(len(cred_all_t2), 2) + + def test_credential_get_name(self): + """Tests get of credential by tenant ID and credential name.""" + cred11 = self._cred_test_obj(1, 1) + cred11_db = self._add_credential(cred11) + cred21 = self._cred_test_obj(2, 1) + cred21_db = self._add_credential(cred21) + cred22 = self._cred_test_obj(2, 2) + cred22_db = self._add_credential(cred22) + self.assertNotEqual(cred11_db.credential_id, cred21_db.credential_id) + self.assertNotEqual(cred11_db.credential_id, cred22_db.credential_id) + self.assertNotEqual(cred21_db.credential_id, cred22_db.credential_id) + + cred = self._get_credential_name(cred11_db) + self._assert_cred_equal(cred, cred11) + cred = self._get_credential_name(cred21_db) + self._assert_cred_equal(cred, cred21) + cred = self._get_credential_name(cred22_db) + self._assert_cred_equal(cred, cred22) + + with testtools.ExpectedException(exceptions.CredentialNameNotFound): + network_db_v2.get_credential_name("dummyTenantId", + "dummyCredentialName") + + def test_credential_update(self): + """Tests update of a credential with a new user name and password.""" + cred11 = self._cred_test_obj(1, 1) + cred11_db = self._add_credential(cred11) + self._update_credential(cred11_db) + new_user_name = "new user name" + new_pwd = "new password" + new_credential = self._update_credential( + cred11_db, new_user_name, new_pwd) + expected_cred = self.CredObj( + cred11.tenant_id, cred11.cred_name, new_user_name, new_pwd) + self._assert_cred_equal(new_credential, expected_cred) + new_credential = self._get_credential(cred11_db) + self._assert_cred_equal(new_credential, expected_cred) + with testtools.ExpectedException(exceptions.CredentialNotFound): + network_db_v2.update_credential( + "dummyTenantId", "dummyCredentialId", new_user_name, new_pwd) diff --git a/neutron/tests/unit/ml2/drivers/test_cisco_nexus_db.py b/neutron/tests/unit/ml2/drivers/test_cisco_nexus_db.py new file mode 100644 index 0000000000..079951b65e --- /dev/null +++ b/neutron/tests/unit/ml2/drivers/test_cisco_nexus_db.py @@ -0,0 +1,221 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import testtools + +from neutron.db import api as db +from neutron.plugins.ml2.drivers.cisco import exceptions +from neutron.plugins.ml2.drivers.cisco import nexus_db_v2 +from neutron.tests import base + + +class CiscoNexusDbTest(base.BaseTestCase): + + """Unit tests for Cisco mechanism driver's Nexus port binding database.""" + + NpbObj = collections.namedtuple('NpbObj', 'port vlan switch instance') + + def setUp(self): + super(CiscoNexusDbTest, self).setUp() + db.configure_db() + self.addCleanup(db.clear_db) + + def _npb_test_obj(self, pnum, vnum, switch='10.9.8.7', instance=None): + """Creates a Nexus port binding test object from a pair of numbers.""" + if pnum is 'router': + port = pnum + else: + port = '1/%s' % pnum + if instance is None: + instance = 'instance_%s_%s' % (pnum, vnum) + return self.NpbObj(port, vnum, switch, instance) + + def _assert_bindings_match(self, npb, npb_obj): + """Asserts that a port binding matches a port binding test obj.""" + self.assertEqual(npb.port_id, npb_obj.port) + self.assertEqual(npb.vlan_id, npb_obj.vlan) + self.assertEqual(npb.switch_ip, npb_obj.switch) + self.assertEqual(npb.instance_id, npb_obj.instance) + + def _add_binding_to_db(self, npb): + """Adds a port binding to the Nexus database.""" + return nexus_db_v2.add_nexusport_binding( + npb.port, npb.vlan, npb.switch, npb.instance) + + def _add_bindings_to_db(self, npbs): + """Adds a list of port bindings to the Nexus database.""" + for npb in npbs: + nexus_db_v2.add_nexusport_binding( + npb.port, npb.vlan, npb.switch, npb.instance) + + def _remove_binding_from_db(self, npb): + """Removes a port binding from the Nexus database.""" + return nexus_db_v2.remove_nexusport_binding( + npb.port, npb.vlan, npb.switch, npb.instance) + + def _get_nexusport_binding(self, npb): + """Gets a port binding based on port, vlan, switch, and instance.""" + return nexus_db_v2.get_nexusport_binding( + npb.port, npb.vlan, npb.switch, npb.instance) + + def _get_nexusvlan_binding(self, npb): + """Gets port bindings based on vlan and switch.""" + return nexus_db_v2.get_nexusvlan_binding(npb.vlan, npb.switch) + + def _get_nexusvm_binding(self, npb): + """Gets port bindings based on vlan and instance.""" + return nexus_db_v2.get_nexusvm_binding(npb.vlan, npb.instance) + + def _get_port_vlan_switch_binding(self, npb): + """Gets port bindings based on port, vlan, and switch.""" + return nexus_db_v2.get_port_vlan_switch_binding( + npb.port, npb.vlan, npb.switch) + + def _get_port_switch_bindings(self, npb): + """Get port bindings based on port and switch.""" + return nexus_db_v2.get_port_switch_bindings(npb.port, npb.switch) + + def test_nexusportbinding_add_remove(self): + """Tests add and removal of port bindings from the Nexus database.""" + npb11 = self._npb_test_obj(10, 100) + npb = self._add_binding_to_db(npb11) + self._assert_bindings_match(npb, npb11) + npb = self._remove_binding_from_db(npb11) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb11) + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + self._remove_binding_from_db(npb11) + + def test_nexusportbinding_get(self): + """Tests get of specific port bindings from the database.""" + npb11 = self._npb_test_obj(10, 100) + npb21 = self._npb_test_obj(20, 100) + npb22 = self._npb_test_obj(20, 200) + self._add_bindings_to_db([npb11, npb21, npb22]) + + npb = self._get_nexusport_binding(npb11) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb11) + npb = self._get_nexusport_binding(npb21) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb21) + npb = self._get_nexusport_binding(npb22) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb22) + + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + nexus_db_v2.get_nexusport_binding( + npb21.port, npb21.vlan, npb21.switch, "dummyInstance") + + def test_nexusvlanbinding_get(self): + """Test get of port bindings based on vlan and switch.""" + npb11 = self._npb_test_obj(10, 100) + npb21 = self._npb_test_obj(20, 100) + npb22 = self._npb_test_obj(20, 200) + self._add_bindings_to_db([npb11, npb21, npb22]) + + npb_all_v100 = self._get_nexusvlan_binding(npb11) + self.assertEqual(len(npb_all_v100), 2) + npb_v200 = self._get_nexusvlan_binding(npb22) + self.assertEqual(len(npb_v200), 1) + self._assert_bindings_match(npb_v200[0], npb22) + + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + nexus_db_v2.get_nexusvlan_binding(npb21.vlan, "dummySwitch") + + def test_nexusvmbinding_get(self): + """Test get of port bindings based on vlan and instance.""" + npb11 = self._npb_test_obj(10, 100) + npb21 = self._npb_test_obj(20, 100) + npb22 = self._npb_test_obj(20, 200) + self._add_bindings_to_db([npb11, npb21, npb22]) + + npb = self._get_nexusvm_binding(npb21) + self._assert_bindings_match(npb, npb21) + npb = self._get_nexusvm_binding(npb22) + self._assert_bindings_match(npb, npb22) + + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + nexus_db_v2.get_nexusvm_binding(npb21.vlan, "dummyInstance") + + def test_nexusportvlanswitchbinding_get(self): + """Tests get of port bindings based on port, vlan, and switch.""" + npb11 = self._npb_test_obj(10, 100) + npb21 = self._npb_test_obj(20, 100) + self._add_bindings_to_db([npb11, npb21]) + + npb = self._get_port_vlan_switch_binding(npb11) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb11) + + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + nexus_db_v2.get_port_vlan_switch_binding( + npb21.port, npb21.vlan, "dummySwitch") + + def test_nexusportswitchbinding_get(self): + """Tests get of port bindings based on port and switch.""" + npb11 = self._npb_test_obj(10, 100) + npb21 = self._npb_test_obj(20, 100, switch='2.2.2.2') + npb22 = self._npb_test_obj(20, 200, switch='2.2.2.2') + self._add_bindings_to_db([npb11, npb21, npb22]) + + npb = self._get_port_switch_bindings(npb11) + self.assertEqual(len(npb), 1) + self._assert_bindings_match(npb[0], npb11) + npb_all_p20 = self._get_port_switch_bindings(npb21) + self.assertEqual(len(npb_all_p20), 2) + + npb = nexus_db_v2.get_port_switch_bindings(npb21.port, "dummySwitch") + self.assertIsNone(npb) + + def test_nexussvibinding_get(self): + """Tests get of switch virtual interface port bindings.""" + npbr1 = self._npb_test_obj('router', 100) + npb21 = self._npb_test_obj(20, 100) + self._add_bindings_to_db([npbr1, npb21]) + + npb_svi = nexus_db_v2.get_nexussvi_bindings() + self.assertEqual(len(npb_svi), 1) + self._assert_bindings_match(npb_svi[0], npbr1) + + npbr2 = self._npb_test_obj('router', 200) + self._add_binding_to_db(npbr2) + npb_svi = nexus_db_v2.get_nexussvi_bindings() + self.assertEqual(len(npb_svi), 2) + + def test_nexusbinding_update(self): + """Tests update of vlan IDs for port bindings.""" + npb11 = self._npb_test_obj(10, 100, switch='1.1.1.1', instance='test') + npb21 = self._npb_test_obj(20, 100, switch='1.1.1.1', instance='test') + self._add_bindings_to_db([npb11, npb21]) + + npb_all_v100 = nexus_db_v2.get_nexusvlan_binding(100, '1.1.1.1') + self.assertEqual(len(npb_all_v100), 2) + + npb22 = self._npb_test_obj(20, 200, switch='1.1.1.1', instance='test') + npb = nexus_db_v2.update_nexusport_binding(npb21.port, 200) + self._assert_bindings_match(npb, npb22) + + npb_all_v100 = nexus_db_v2.get_nexusvlan_binding(100, '1.1.1.1') + self.assertEqual(len(npb_all_v100), 1) + self._assert_bindings_match(npb_all_v100[0], npb11) + + npb = nexus_db_v2.update_nexusport_binding(npb21.port, 0) + self.assertIsNone(npb) + + npb33 = self._npb_test_obj(30, 300, switch='1.1.1.1', instance='test') + with testtools.ExpectedException(exceptions.NexusPortBindingNotFound): + nexus_db_v2.update_nexusport_binding(npb33.port, 200)