Merging changes addressing Bug # 802772. Thanks lp:danwent !

Description of change:

combining with other work to make the tests run green, this cleans up the tests for the OVS plugin
This commit is contained in:
Somik Behera 2011-08-08 17:35:16 -07:00
commit 5c97893079
8 changed files with 218 additions and 231 deletions

View File

@ -31,6 +31,7 @@ class Fault(webob.exc.HTTPException):
401: "unauthorized", 401: "unauthorized",
420: "networkNotFound", 420: "networkNotFound",
421: "networkInUse", 421: "networkInUse",
422: "networkNameExists",
430: "portNotFound", 430: "portNotFound",
431: "requestedStateInvalid", 431: "requestedStateInvalid",
432: "portInUse", 432: "portInUse",
@ -91,6 +92,22 @@ class NetworkInUse(webob.exc.HTTPClientError):
explanation = ('Unable to remove the network: attachments still plugged.') explanation = ('Unable to remove the network: attachments still plugged.')
class NetworkNameExists(webob.exc.HTTPClientError):
"""
subclass of :class:`~HTTPClientError`
This indicates that the server could not set the network name to the
specified value because another network for the same tenant already has
that name.
code: 422, title: Network Name Exists
"""
code = 422
title = 'Network Name Exists'
explanation = ('Unable to set network name: tenant already has network' \
' with same name.')
class PortNotFound(webob.exc.HTTPClientError): class PortNotFound(webob.exc.HTTPClientError):
""" """
subclass of :class:`~HTTPClientError` subclass of :class:`~HTTPClientError`

View File

@ -107,12 +107,15 @@ class Controller(common.QuantumController):
self._network_ops_param_list) self._network_ops_param_list)
except exc.HTTPError as e: except exc.HTTPError as e:
return faults.Fault(e) return faults.Fault(e)
network = self._plugin.\ try:
network = self._plugin.\
create_network(tenant_id, create_network(tenant_id,
request_params['net-name']) request_params['net-name'])
builder = networks_view.get_view_builder(request) builder = networks_view.get_view_builder(request)
result = builder.build(network) result = builder.build(network)
return dict(networks=result) return dict(networks=result)
except exception.NetworkNameExists as e:
return faults.Fault(faults.NetworkNameExists(e))
def update(self, request, tenant_id, id): def update(self, request, tenant_id, id):
""" Updates the name for the network with the given id """ """ Updates the name for the network with the given id """
@ -128,6 +131,8 @@ class Controller(common.QuantumController):
return exc.HTTPAccepted() return exc.HTTPAccepted()
except exception.NetworkNotFound as e: except exception.NetworkNotFound as e:
return faults.Fault(faults.NetworkNotFound(e)) return faults.Fault(faults.NetworkNotFound(e))
except exception.NetworkNameExists as e:
return faults.Fault(faults.NetworkNameExists(e))
def delete(self, request, tenant_id, id): def delete(self, request, tenant_id, id):
""" Destroys the network with the given id """ """ Destroys the network with the given id """

View File

@ -21,6 +21,9 @@ Quantum-type exceptions. SHOULD include dedicated exception logging.
""" """
import logging import logging
import gettext
gettext.install('quantum', unicode=1)
class QuantumException(Exception): class QuantumException(Exception):
@ -108,6 +111,12 @@ class AlreadyAttached(QuantumException):
"already plugged into port %(att_port_id)s") "already plugged into port %(att_port_id)s")
class NetworkNameExists(QuantumException):
message = _("Unable to set network name to %(net_name). " \
"Network with id %(net_id) already has this name for " \
"tenant %(tenant_id)")
class Duplicate(Error): class Duplicate(Error):
pass pass

View File

@ -20,6 +20,7 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, exc from sqlalchemy.orm import sessionmaker, exc
from quantum.common import exceptions as q_exc
from quantum.db import models from quantum.db import models
_ENGINE = None _ENGINE = None
@ -75,21 +76,29 @@ def unregister_models():
BASE.metadata.drop_all(_ENGINE) BASE.metadata.drop_all(_ENGINE)
def network_create(tenant_id, name): def _check_duplicate_net_name(tenant_id, net_name):
session = get_session() session = get_session()
net = None
try: try:
net = session.query(models.Network).\ net = session.query(models.Network).\
filter_by(tenant_id=tenant_id, name=name).\ filter_by(tenant_id=tenant_id, name=net_name).\
one() one()
raise Exception("Network with name %(name)s already " \ raise q_exc.NetworkNameExists(tenant_id=tenant_id,
"exists for tenant %(tenant_id)s" % locals()) net_name=net_name, net_id=net.uuid)
except exc.NoResultFound: except exc.NoResultFound:
with session.begin(): # this is the "normal" path, as API spec specifies
net = models.Network(tenant_id, name) # that net-names are unique within a tenant
session.add(net) pass
session.flush()
return net
def network_create(tenant_id, name):
session = get_session()
_check_duplicate_net_name(tenant_id, name)
with session.begin():
net = models.Network(tenant_id, name)
session.add(net)
session.flush()
return net
def network_list(tenant_id): def network_list(tenant_id):
@ -105,23 +114,18 @@ def network_get(net_id):
return session.query(models.Network).\ return session.query(models.Network).\
filter_by(uuid=net_id).\ filter_by(uuid=net_id).\
one() one()
except exc.NoResultFound: except exc.NoResultFound, e:
raise Exception("No net found with id = %s" % net_id) raise q_exc.NetworkNotFound(net_id=net_id)
def network_rename(net_id, tenant_id, new_name): def network_rename(net_id, tenant_id, new_name):
session = get_session() session = get_session()
try: net = network_get(net_id)
res = session.query(models.Network).\ _check_duplicate_net_name(tenant_id, new_name)
filter_by(tenant_id=tenant_id, name=new_name).\ net.name = new_name
one() session.merge(net)
except exc.NoResultFound: session.flush()
net = network_get(net_id) return net
net.name = new_name
session.merge(net)
session.flush()
return net
raise Exception("A network with name \"%s\" already exists" % new_name)
def network_destroy(net_id): def network_destroy(net_id):
@ -134,10 +138,13 @@ def network_destroy(net_id):
session.flush() session.flush()
return net return net
except exc.NoResultFound: except exc.NoResultFound:
raise Exception("No network found with id = %s" % net_id) raise q_exc.NetworkNotFound(net_id=net_id)
def port_create(net_id, state=None): def port_create(net_id, state=None):
# confirm network exists
network_get(net_id)
session = get_session() session = get_session()
with session.begin(): with session.begin():
port = models.Port(net_id) port = models.Port(net_id)
@ -154,63 +161,90 @@ def port_list(net_id):
all() all()
def port_get(port_id): def port_get(port_id, net_id):
# confirm network exists
network_get(net_id)
session = get_session() session = get_session()
try: try:
return session.query(models.Port).\ return session.query(models.Port).\
filter_by(uuid=port_id).\ filter_by(uuid=port_id).\
filter_by(network_id=net_id).\
one() one()
except exc.NoResultFound: except exc.NoResultFound:
raise Exception("No port found with id = %s " % port_id) raise q_exc.PortNotFound(net_id=net_id, port_id=port_id)
def port_set_state(port_id, new_state): def port_set_state(port_id, net_id, new_state):
port = port_get(port_id) if new_state not in ('ACTIVE', 'DOWN'):
if port: raise q_exc.StateInvalid(port_state=new_state)
session = get_session()
port.state = new_state
session.merge(port)
session.flush()
return port
# confirm network exists
network_get(net_id)
def port_set_attachment(port_id, new_interface_id): port = port_get(port_id, net_id)
session = get_session() session = get_session()
ports = [] port.state = new_state
session.merge(port)
session.flush()
return port
def port_set_attachment(port_id, net_id, new_interface_id):
# confirm network exists
network_get(net_id)
session = get_session()
port = port_get(port_id, net_id)
if new_interface_id != "": if new_interface_id != "":
# We are setting, not clearing, the attachment-id
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['interface_id'])
try: try:
ports = session.query(models.Port).\ port = session.query(models.Port).\
filter_by(interface_id=new_interface_id).\ filter_by(interface_id=new_interface_id).\
all() one()
raise q_exc.AlreadyAttached(net_id=net_id,
port_id=port_id,
att_id=new_interface_id,
att_port_id=port['uuid'])
except exc.NoResultFound: except exc.NoResultFound:
# this is what should happen
pass pass
if len(ports) == 0: port.interface_id = new_interface_id
port = port_get(port_id) session.merge(port)
port.interface_id = new_interface_id session.flush()
session.merge(port) return port
session.flush()
return port
else:
raise Exception("Port with attachment \"%s\" already exists"
% (new_interface_id))
def port_unset_attachment(port_id): def port_unset_attachment(port_id, net_id):
# confirm network exists
network_get(net_id)
session = get_session() session = get_session()
port = port_get(port_id) port = port_get(port_id, net_id)
port.interface_id = None port.interface_id = None
session.merge(port) session.merge(port)
session.flush session.flush()
def port_destroy(port_id): def port_destroy(port_id, net_id):
# confirm network exists
network_get(net_id)
session = get_session() session = get_session()
try: try:
port = session.query(models.Port).\ port = session.query(models.Port).\
filter_by(uuid=port_id).\ filter_by(uuid=port_id).\
one() filter_by(network_id=net_id).\
one()
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['interface_id'])
session.delete(port) session.delete(port)
session.flush() session.flush()
return port return port
except exc.NoResultFound: except exc.NoResultFound:
raise Exception("No port found with id = %s " % port_id) raise q_exc.PortNotFound(port_id=port_id)

View File

@ -240,7 +240,7 @@ class FakePlugin(object):
def _get_port(self, tenant_id, network_id, port_id): def _get_port(self, tenant_id, network_id, port_id):
net = self._get_network(tenant_id, network_id) net = self._get_network(tenant_id, network_id)
try: try:
port = db.port_get(port_id) port = db.port_get(port_id, network_id)
except: except:
raise exc.PortNotFound(net_id=network_id, port_id=port_id) raise exc.PortNotFound(net_id=network_id, port_id=port_id)
# Port must exist and belong to the appropriate network. # Port must exist and belong to the appropriate network.
@ -322,10 +322,7 @@ class FakePlugin(object):
Virtual Network. Virtual Network.
""" """
LOG.debug("FakePlugin.rename_network() called") LOG.debug("FakePlugin.rename_network() called")
try: db.network_rename(net_id, tenant_id, new_name)
db.network_rename(net_id, tenant_id, new_name)
except:
raise exc.NetworkNotFound(net_id=net_id)
net = self._get_network(tenant_id, net_id) net = self._get_network(tenant_id, net_id)
return net return net
@ -373,7 +370,7 @@ class FakePlugin(object):
self._get_network(tenant_id, net_id) self._get_network(tenant_id, net_id)
self._get_port(tenant_id, net_id, port_id) self._get_port(tenant_id, net_id, port_id)
self._validate_port_state(new_state) self._validate_port_state(new_state)
db.port_set_state(port_id, new_state) db.port_set_state(port_id, net_id, new_state)
port_item = {'port-id': port_id, port_item = {'port-id': port_id,
'port-state': new_state} 'port-state': new_state}
return port_item return port_item
@ -392,7 +389,7 @@ class FakePlugin(object):
raise exc.PortInUse(net_id=net_id, port_id=port_id, raise exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['interface_id']) att_id=port['interface_id'])
try: try:
port = db.port_destroy(port_id) port = db.port_destroy(port_id, net_id)
except Exception, e: except Exception, e:
raise Exception("Failed to delete port: %s" % str(e)) raise Exception("Failed to delete port: %s" % str(e))
d = {} d = {}
@ -412,7 +409,7 @@ class FakePlugin(object):
if port['interface_id']: if port['interface_id']:
raise exc.PortInUse(net_id=net_id, port_id=port_id, raise exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['interface_id']) att_id=port['interface_id'])
db.port_set_attachment(port_id, remote_interface_id) db.port_set_attachment(port_id, net_id, remote_interface_id)
def unplug_interface(self, tenant_id, net_id, port_id): def unplug_interface(self, tenant_id, net_id, port_id):
""" """
@ -423,4 +420,4 @@ class FakePlugin(object):
self._get_port(tenant_id, net_id, port_id) self._get_port(tenant_id, net_id, port_id)
# TODO(salvatore-orlando): # TODO(salvatore-orlando):
# Should unplug on port without attachment raise an Error? # Should unplug on port without attachment raise an Error?
db.port_unset_attachment(port_id) db.port_unset_attachment(port_id, net_id)

View File

@ -198,7 +198,7 @@ class OVSQuantumAgent:
while True: while True:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT * FROM ports") cursor.execute("SELECT * FROM ports where state = 'ACTIVE'")
rows = cursor.fetchall() rows = cursor.fetchall()
cursor.close() cursor.close()
all_bindings = {} all_bindings = {}

View File

@ -19,12 +19,13 @@
import ConfigParser import ConfigParser
import logging as LOG import logging as LOG
from optparse import OptionParser
import os import os
import sys import sys
import unittest import unittest
from quantum.common import exceptions as q_exc
from quantum.quantum_plugin_base import QuantumPluginBase from quantum.quantum_plugin_base import QuantumPluginBase
from optparse import OptionParser
import quantum.db.api as db import quantum.db.api as db
import ovs_db import ovs_db
@ -111,101 +112,93 @@ class OVSQuantumPlugin(QuantumPluginBase):
nets = [] nets = []
for x in db.network_list(tenant_id): for x in db.network_list(tenant_id):
LOG.debug("Adding network: %s" % x.uuid) LOG.debug("Adding network: %s" % x.uuid)
d = {} nets.append(self._make_net_dict(str(x.uuid), x.name, None))
d["net-id"] = str(x.uuid)
d["net-name"] = x.name
nets.append(d)
return nets return nets
def _make_net_dict(self, net_id, net_name, ports):
res = {'net-id': net_id,
'net-name': net_name}
if ports:
res['net-ports'] = ports
return res
def create_network(self, tenant_id, net_name): def create_network(self, tenant_id, net_name):
d = {} net = db.network_create(tenant_id, net_name)
try: LOG.debug("Created network: %s" % net)
res = db.network_create(tenant_id, net_name) vlan_id = self.vmap.acquire(str(net.uuid))
LOG.debug("Created newtork: %s" % res) ovs_db.add_vlan_binding(vlan_id, str(net.uuid))
except Exception, e: return self._make_net_dict(str(net.uuid), net.name, [])
LOG.error("Error: %s" % str(e))
return d
d["net-id"] = str(res.uuid)
d["net-name"] = res.name
vlan_id = self.vmap.acquire(str(res.uuid))
ovs_db.add_vlan_binding(vlan_id, str(res.uuid))
return d
def delete_network(self, tenant_id, net_id): def delete_network(self, tenant_id, net_id):
net = db.network_get(net_id)
# Verify that no attachments are plugged into the network
for port in db.port_list(net_id):
if port['interface_id']:
raise q_exc.NetworkInUse(net_id=net_id)
net = db.network_destroy(net_id) net = db.network_destroy(net_id)
d = {}
d["net-id"] = str(net.uuid)
ovs_db.remove_vlan_binding(net_id) ovs_db.remove_vlan_binding(net_id)
self.vmap.release(net_id) self.vmap.release(net_id)
return d return self._make_net_dict(str(net.uuid), net.name, [])
def get_network_details(self, tenant_id, net_id): def get_network_details(self, tenant_id, net_id):
ports = db.port_list(net_id) net = db.network_get(net_id)
ifaces = [] ports = self.get_all_ports(tenant_id, net_id)
for p in ports: return self._make_net_dict(str(net.uuid), net.name, ports)
ifaces.append(p.interface_id)
return ifaces
def rename_network(self, tenant_id, net_id, new_name): def rename_network(self, tenant_id, net_id, new_name):
try: net = db.network_rename(net_id, tenant_id, new_name)
net = db.network_rename(net_id, tenant_id, new_name) return self._make_net_dict(str(net.uuid), net.name, None)
except Exception, e:
raise Exception("Failed to rename network: %s" % str(e)) def _make_port_dict(self, port_id, port_state, net_id, attachment):
d = {} res = {'port-id': port_id,
d["net-id"] = str(net.uuid) 'port-state': port_state}
d["net-name"] = net.name if net_id:
return d res['net-id'] = net_id
if attachment:
res['attachment-id'] = attachment
return res
def get_all_ports(self, tenant_id, net_id): def get_all_ports(self, tenant_id, net_id):
ids = [] ids = []
ports = db.port_list(net_id) ports = db.port_list(net_id)
for x in ports: for p in ports:
LOG.debug("Appending port: %s" % x.uuid) LOG.debug("Appending port: %s" % p.uuid)
d = {} d = self._make_port_dict(str(p.uuid), p.state, None, None)
d["port-id"] = str(x.uuid)
ids.append(d) ids.append(d)
return ids return ids
def create_port(self, tenant_id, net_id, port_state=None): def create_port(self, tenant_id, net_id, port_state=None):
LOG.debug("Creating port with network_id: %s" % net_id) LOG.debug("Creating port with network_id: %s" % net_id)
port = db.port_create(net_id) port = db.port_create(net_id, port_state)
d = {} return self._make_port_dict(str(port.uuid), port.state, None, None)
d["port-id"] = str(port.uuid)
LOG.debug("-> %s" % (port.uuid))
return d
def delete_port(self, tenant_id, net_id, port_id): def delete_port(self, tenant_id, net_id, port_id):
try: port = db.port_destroy(port_id, net_id)
port = db.port_destroy(port_id) return self._make_port_dict(str(port.uuid), port.state, None, None)
except Exception, e:
raise Exception("Failed to delete port: %s" % str(e))
d = {}
d["port-id"] = str(port.uuid)
return d
def update_port(self, tenant_id, net_id, port_id, port_state): def update_port(self, tenant_id, net_id, port_id, port_state):
""" """
Updates the state of a port on the specified Virtual Network. Updates the state of a port on the specified Virtual Network.
""" """
LOG.debug("update_port() called\n") LOG.debug("update_port() called\n")
port = db.port_get(port_id) port = db.port_get(port_id, net_id)
port['port-state'] = port_state db.port_set_state(port_id, net_id, port_state)
return port return self._make_port_dict(str(port.uuid), port.state, None, None)
def get_port_details(self, tenant_id, net_id, port_id): def get_port_details(self, tenant_id, net_id, port_id):
port = db.port_get(port_id) port = db.port_get(port_id, net_id)
rv = {"port-id": port.uuid, "attachment": port.interface_id, return self._make_port_dict(str(port.uuid), port.state,
"net-id": port.network_id, "port-state": "UP"} port.network_id, port.interface_id)
return rv
def plug_interface(self, tenant_id, net_id, port_id, remote_iface_id): def plug_interface(self, tenant_id, net_id, port_id, remote_iface_id):
db.port_set_attachment(port_id, remote_iface_id) db.port_set_attachment(port_id, net_id, remote_iface_id)
def unplug_interface(self, tenant_id, net_id, port_id): def unplug_interface(self, tenant_id, net_id, port_id):
db.port_set_attachment(port_id, "") db.port_set_attachment(port_id, net_id, "")
def get_interface_details(self, tenant_id, net_id, port_id): def get_interface_details(self, tenant_id, net_id, port_id):
res = db.port_get(port_id) res = db.port_get(port_id, net_id)
return res.interface_id return res.interface_id
@ -227,108 +220,6 @@ class VlanMapTest(unittest.TestCase):
self.assertTrue(self.vmap.get(vlan_id) == None) self.assertTrue(self.vmap.get(vlan_id) == None)
# TODO(bgh): Make the tests use a sqlite database instead of mysql
class OVSPluginTest(unittest.TestCase):
def setUp(self):
self.quantum = OVSQuantumPlugin()
self.tenant_id = "testtenant"
def testCreateNetwork(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
def testGetNetworks(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
net2 = self.quantum.create_network(self.tenant_id, "plugin_test2")
nets = self.quantum.get_all_networks(self.tenant_id)
count = 0
for x in nets:
if "plugin_test" in x["net-name"]:
count += 1
self.assertTrue(count == 2)
def testDeleteNetwork(self):
net = self.quantum.create_network(self.tenant_id, "plugin_test1")
self.quantum.delete_network(self.tenant_id, net["net-id"])
nets = self.quantum.get_all_networks(self.tenant_id)
count = 0
for x in nets:
if "plugin_test" in x["net-name"]:
count += 1
self.assertTrue(count == 0)
def testRenameNetwork(self):
net = self.quantum.create_network(self.tenant_id, "plugin_test1")
net = self.quantum.rename_network(self.tenant_id, net["net-id"],
"plugin_test_renamed")
self.assertTrue(net["net-name"] == "plugin_test_renamed")
def testCreatePort(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
port = self.quantum.create_port(self.tenant_id, net1["net-id"])
ports = self.quantum.get_all_ports(self.tenant_id, net1["net-id"])
count = 0
for p in ports:
count += 1
self.assertTrue(count == 1)
def testDeletePort(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
port = self.quantum.create_port(self.tenant_id, net1["net-id"])
ports = self.quantum.get_all_ports(self.tenant_id, net1["net-id"])
count = 0
for p in ports:
count += 1
self.assertTrue(count == 1)
for p in ports:
self.quantum.delete_port(self.tenant_id, id, p["port-id"])
ports = self.quantum.get_all_ports(self.tenant_id, net1["net-id"])
count = 0
for p in ports:
count += 1
self.assertTrue(count == 0)
def testGetPorts(self):
pass
def testPlugInterface(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
port = self.quantum.create_port(self.tenant_id, net1["net-id"])
self.quantum.plug_interface(self.tenant_id, net1["net-id"],
port["port-id"], "vif1.1")
port = self.quantum.get_port_details(self.tenant_id, net1["net-id"],
port["port-id"])
self.assertTrue(port["attachment"] == "vif1.1")
def testUnPlugInterface(self):
net1 = self.quantum.create_network(self.tenant_id, "plugin_test1")
port = self.quantum.create_port(self.tenant_id, net1["net-id"])
self.quantum.plug_interface(self.tenant_id, net1["net-id"],
port["port-id"], "vif1.1")
port = self.quantum.get_port_details(self.tenant_id, net1["net-id"],
port["port-id"])
self.assertTrue(port["attachment"] == "vif1.1")
self.quantum.unplug_interface(self.tenant_id, net1["net-id"],
port["port-id"])
port = self.quantum.get_port_details(self.tenant_id, net1["net-id"],
port["port-id"])
self.assertTrue(port["attachment"] == "")
def tearDown(self):
networks = self.quantum.get_all_networks(self.tenant_id)
# Clean up any test networks lying around
for net in networks:
id = net["net-id"]
name = net["net-name"]
if "plugin_test" in name:
# Clean up any test ports lying around
ports = self.quantum.get_all_ports(self.tenant_id, id)
for p in ports:
self.quantum.delete_port(self.tenant_id, id, p["port-id"])
self.quantum.delete_network(self.tenant_id, id)
if __name__ == "__main__": if __name__ == "__main__":
usagestr = "Usage: %prog [OPTIONS] <command> [args]" usagestr = "Usage: %prog [OPTIONS] <command> [args]"
parser = OptionParser(usage=usagestr) parser = OptionParser(usage=usagestr)
@ -345,7 +236,5 @@ if __name__ == "__main__":
# Make sqlalchemy quieter # Make sqlalchemy quieter
LOG.getLogger('sqlalchemy.engine').setLevel(LOG.WARN) LOG.getLogger('sqlalchemy.engine').setLevel(LOG.WARN)
# Run the tests # Run the tests
suite = unittest.TestLoader().loadTestsFromTestCase(OVSPluginTest)
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestLoader().loadTestsFromTestCase(VlanMapTest) suite = unittest.TestLoader().loadTestsFromTestCase(VlanMapTest)
unittest.TextTestRunner(verbosity=2).run(suite) unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -150,6 +150,19 @@ class APITest(unittest.TestCase):
network_data['network']) network_data['network'])
LOG.debug("_test_rename_network - format:%s - END", format) LOG.debug("_test_rename_network - format:%s - END", format)
def _test_rename_network_duplicate(self, format):
LOG.debug("_test_rename_network_duplicate - format:%s - START", format)
content_type = "application/%s" % format
network_id1 = self._create_network(format, name="net1")
network_id2 = self._create_network(format, name="net2")
update_network_req = testlib.update_network_request(self.tenant_id,
network_id2,
"net1",
format)
update_network_res = update_network_req.get_response(self.api)
self.assertEqual(update_network_res.status_int, 422)
LOG.debug("_test_rename_network_duplicate - format:%s - END", format)
def _test_rename_network_badrequest(self, format): def _test_rename_network_badrequest(self, format):
LOG.debug("_test_rename_network_badrequest - format:%s - START", LOG.debug("_test_rename_network_badrequest - format:%s - START",
format) format)
@ -427,6 +440,23 @@ class APITest(unittest.TestCase):
show_port_res.body, content_type) show_port_res.body, content_type)
self.assertEqual({'id': port_id, 'state': new_port_state}, self.assertEqual({'id': port_id, 'state': new_port_state},
port_data['port']) port_data['port'])
# now set it back to the original value
update_port_req = testlib.update_port_request(self.tenant_id,
network_id, port_id,
port_state,
format)
update_port_res = update_port_req.get_response(self.api)
self.assertEqual(update_port_res.status_int, 200)
show_port_req = testlib.show_port_request(self.tenant_id,
network_id, port_id,
format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 200)
port_data = self._port_serializer.deserialize(
show_port_res.body, content_type)
self.assertEqual({'id': port_id, 'state': port_state},
port_data['port'])
LOG.debug("_test_set_port_state - format:%s - END", format) LOG.debug("_test_set_port_state - format:%s - END", format)
def _test_set_port_state_networknotfound(self, format): def _test_set_port_state_networknotfound(self, format):
@ -706,6 +736,12 @@ class APITest(unittest.TestCase):
def test_rename_network_xml(self): def test_rename_network_xml(self):
self._test_rename_network('xml') self._test_rename_network('xml')
def test_rename_network_duplicate_json(self):
self._test_rename_network_duplicate('json')
def test_rename_network_duplicate_xml(self):
self._test_rename_network_duplicate('xml')
def test_rename_network_badrequest_json(self): def test_rename_network_badrequest_json(self):
self._test_rename_network_badrequest('json') self._test_rename_network_badrequest('json')