Merge "Fix resource status in NEC Plugin"

This commit is contained in:
Jenkins 2013-08-12 17:17:53 +00:00 committed by Gerrit Code Review
commit 77f9757467
2 changed files with 200 additions and 95 deletions

View File

@ -19,7 +19,7 @@
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import constants as const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
@ -38,6 +38,7 @@ from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
@ -47,21 +48,6 @@ from neutron.plugins.nec import packet_filter
LOG = logging.getLogger(__name__)
class OperationalStatus:
"""Enumeration for operational status.
ACTIVE: The resource is available.
DOWN: The resource is not operational. This might indicate
admin_state_up=False, or lack of OpenFlow info for the port.
BUILD: The plugin is creating the resource.
ERROR: Some error occured.
"""
ACTIVE = "ACTIVE"
DOWN = "DOWN"
BUILD = "BUILD"
ERROR = "ERROR"
class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
@ -123,10 +109,10 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
self.agent_notifiers[const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
@ -151,7 +137,6 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def activate_port_if_ready(self, context, port, network=None):
"""Activate port by creating port on OFC if ready.
Activate port and packet_filters associated with the port.
Conditions to activate port on OFC are:
* port admin_state is UP
* network admin_state is UP
@ -161,52 +146,29 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
network = super(NECPluginV2, self).get_network(context,
port['network_id'])
port_status = OperationalStatus.ACTIVE
if not port['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"port.admin_state_up is False."))
port_status = OperationalStatus.DOWN
return port
elif not network['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"network.admin_state_up is False."))
port_status = OperationalStatus.DOWN
return port
elif not ndb.get_portinfo(context.session, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"no portinfo for this port."))
port_status = OperationalStatus.DOWN
return port
elif self.ofc.exists_ofc_port(context, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"ofc_port already exists."))
return port
if port_status in [OperationalStatus.ACTIVE]:
if self.ofc.exists_ofc_port(context, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"ofc_port already exists."))
else:
try:
self.ofc.create_ofc_port(context, port['id'], port)
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
reason = _("create_ofc_port() failed due to %s") % exc
LOG.error(reason)
port_status = OperationalStatus.ERROR
if port_status is not port['status']:
self._update_resource_status(context, "port", port['id'],
port_status)
def deactivate_port(self, context, port):
"""Deactivate port by deleting port from OFC if exists.
Deactivate port and packet_filters associated with the port.
"""
port_status = OperationalStatus.DOWN
if self.ofc.exists_ofc_port(context, port['id']):
try:
self.ofc.delete_ofc_port(context, port['id'], port)
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
reason = _("delete_ofc_port() failed due to %s") % exc
LOG.error(reason)
port_status = OperationalStatus.ERROR
else:
LOG.debug(_("deactivate_port(): skip, ofc_port does not "
"exist."))
try:
self.ofc.create_ofc_port(context, port['id'], port)
port_status = const.PORT_STATUS_ACTIVE
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
LOG.error(_("create_ofc_port() failed due to %s"), exc)
port_status = const.PORT_STATUS_ERROR
if port_status is not port['status']:
self._update_resource_status(context, "port", port['id'],
@ -215,34 +177,62 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
return port
def deactivate_port(self, context, port):
"""Deactivate port by deleting port from OFC if exists."""
if not self.ofc.exists_ofc_port(context, port['id']):
LOG.debug(_("deactivate_port(): skip, ofc_port does not "
"exist."))
return port
try:
self.ofc.delete_ofc_port(context, port['id'], port)
port_status = const.PORT_STATUS_DOWN
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
LOG.error(_("delete_ofc_port() failed due to %s"), exc)
port_status = const.PORT_STATUS_ERROR
if port_status is not port['status']:
self._update_resource_status(context, "port", port['id'],
port_status)
port['status'] = port_status
return port
def _net_status(self, network):
# NOTE: NEC Plugin accept admin_state_up. When it's False, this plugin
# deactivate all ports on the network to drop all packet and show
# status='DOWN' to users. But the network is kept defined on OFC.
if network['network']['admin_state_up']:
return const.NET_STATUS_ACTIVE
else:
return const.NET_STATUS_DOWN
def create_network(self, context, network):
"""Create a new network entry on DB, and create it on OFC."""
LOG.debug(_("NECPluginV2.create_network() called, "
"network=%s ."), network)
tenant_id = self._get_tenant_id_for_create(context, network['network'])
net_name = network['network']['name']
net_id = uuidutils.generate_uuid()
#set up default security groups
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
network['network']['id'] = net_id
network['network']['status'] = self._net_status(network)
try:
if not self.ofc.exists_ofc_tenant(context, tenant_id):
self.ofc.create_ofc_tenant(context, tenant_id)
self.ofc.create_ofc_network(context, tenant_id, net_id, net_name)
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
LOG.error(_("failed to create network id=%(id)s on "
"OFC: %(exc)s"), {'id': net_id, 'exc': exc})
network['network']['status'] = const.NET_STATUS_ERROR
with context.session.begin(subtransactions=True):
new_net = super(NECPluginV2, self).create_network(context, network)
self._process_l3_create(context, new_net, network['network'])
self._update_resource_status(context, "network", new_net['id'],
OperationalStatus.BUILD)
try:
if not self.ofc.exists_ofc_tenant(context, new_net['tenant_id']):
self.ofc.create_ofc_tenant(context, new_net['tenant_id'])
self.ofc.create_ofc_network(context, new_net['tenant_id'],
new_net['id'], new_net['name'])
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
reason = _("create_network() failed due to %s") % exc
LOG.error(reason)
self._update_resource_status(context, "network", new_net['id'],
OperationalStatus.ERROR)
else:
self._update_resource_status(context, "network", new_net['id'],
OperationalStatus.ACTIVE)
return new_net
@ -255,6 +245,10 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug(_("NECPluginV2.update_network() called, "
"id=%(id)s network=%(network)s ."),
{'id': id, 'network': network})
if 'admin_state_up' in network['network']:
network['network']['status'] = self._net_status(network)
session = context.session
with session.begin(subtransactions=True):
old_net = super(NECPluginV2, self).get_network(context, id)
@ -264,19 +258,15 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
changed = (old_net['admin_state_up'] is not new_net['admin_state_up'])
if changed and not new_net['admin_state_up']:
self._update_resource_status(context, "network", id,
OperationalStatus.DOWN)
# disable all active ports of the network
filters = dict(network_id=[id], status=[OperationalStatus.ACTIVE])
filters = dict(network_id=[id], status=[const.PORT_STATUS_ACTIVE])
ports = super(NECPluginV2, self).get_ports(context,
filters=filters)
for port in ports:
self.deactivate_port(context, port)
elif changed and new_net['admin_state_up']:
self._update_resource_status(context, "network", id,
OperationalStatus.ACTIVE)
# enable ports of the network
filters = dict(network_id=[id], status=[OperationalStatus.DOWN],
filters = dict(network_id=[id], status=[const.PORT_STATUS_DOWN],
admin_state_up=[True])
ports = super(NECPluginV2, self).get_ports(context,
filters=filters)
@ -308,7 +298,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
_error_ports = []
for port in ports:
port = self.deactivate_port(context, port)
if port['status'] == OperationalStatus.ERROR:
if port['status'] == const.PORT_STATUS_ERROR:
_error_ports.append(port['id'])
if _error_ports:
reason = (_("Failed to delete port(s)=%s from OFC.") %
@ -329,7 +319,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
reason = _("delete_network() failed due to %s") % exc
LOG.error(reason)
self._update_resource_status(context, "network", net['id'],
OperationalStatus.ERROR)
const.NET_STATUS_ERROR)
raise
super(NECPluginV2, self).delete_network(context, id)
@ -355,6 +345,9 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
port['port']['status'] = const.PORT_STATUS_DOWN
port_data = port['port']
with context.session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
@ -366,10 +359,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
self._update_resource_status(context, "port", port['id'],
OperationalStatus.BUILD)
self.activate_port_if_ready(context, port)
return port
return self.activate_port_if_ready(context, port)
def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port.
@ -398,9 +389,9 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
if changed:
if new_port['admin_state_up']:
self.activate_port_if_ready(context, new_port)
new_port = self.activate_port_if_ready(context, new_port)
else:
self.deactivate_port(context, old_port)
new_port = self.deactivate_port(context, new_port)
return new_port
@ -413,7 +404,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
port = self.get_port(context, id)
port = self.deactivate_port(context, port)
if port['status'] == OperationalStatus.ERROR:
if port['status'] == const.PORT_STATUS_ERROR:
reason = _("Failed to delete port=%s from OFC.") % id
raise nexc.OFCException(reason=reason)

View File

@ -19,6 +19,7 @@ import fixtures
import mock
import webob.exc
from neutron.common import constants
from neutron.common.test_lib import test_config
from neutron.common import topics
from neutron import context
@ -86,6 +87,8 @@ class NecPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
self.ofc = self.plugin.ofc
self.callback_nec = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
self.context = context.get_admin_context()
self.net_create_status = 'ACTIVE'
self.port_create_status = 'DOWN'
class TestNecBasicGet(test_plugin.TestBasicGet, NecPluginV2TestCase):
@ -312,7 +315,7 @@ class TestNecPluginDbTest(NecPluginV2TestCase):
for status in ["DOWN", "BUILD", "ERROR", "ACTIVE"]:
self.plugin._update_resource_status(
self.context, 'network', net_id,
getattr(nec_plugin.OperationalStatus, status))
getattr(constants, 'NET_STATUS_%s' % status))
n = self.plugin._get_network(self.context, net_id)
self.assertEqual(status, n.status)
@ -376,6 +379,7 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
ctx = mock.ANY
with self.network(admin_state_up=False) as network:
net = network['network']
self.assertEqual(network['network']['status'], 'DOWN')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
@ -423,6 +427,7 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
# network from OFC. Deletion of network is not the scope of this test.
with self.network(do_delete=False) as network:
net = network['network']
self.assertEqual(net['status'], 'ERROR')
net_ref = self._show('networks', net['id'])
self.assertEqual(net_ref['network']['status'], 'ERROR')
@ -441,15 +446,26 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
net = network['network']
self.assertEqual(network['network']['status'], 'ACTIVE')
net_ref = self._show('networks', net['id'])
self.assertEqual(net_ref['network']['status'], 'ACTIVE')
# Set admin_state_up to False
res = self._update_resource('network', net['id'],
{'admin_state_up': False})
self.assertFalse(res['admin_state_up'])
self.assertEqual(res['status'], 'DOWN')
net_ref = self._show('networks', net['id'])
self.assertEqual(net_ref['network']['status'], 'DOWN')
# Set admin_state_up to True
res = self._update_resource('network', net['id'],
{'admin_state_up': True})
self.assertTrue(res['admin_state_up'])
self.assertEqual(res['status'], 'ACTIVE')
net_ref = self._show('networks', net['id'])
self.assertEqual(net_ref['network']['status'], 'ACTIVE')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
@ -471,7 +487,10 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
net_id = port['port']['network_id']
net = self._show_resource('network', net_id)
self.assertEqual(net['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'DOWN')
p1_ref = self._show('ports', p1['id'])
self.assertEqual(p1_ref['port']['status'], 'DOWN')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
@ -495,7 +514,10 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
net_id = port['port']['network_id']
net = self._show_resource('network', net_id)
self.assertEqual(net['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'DOWN')
p1_ref = self._show('ports', p1['id'])
self.assertEqual(p1_ref['port']['status'], 'DOWN')
# Check the port is not created on OFC
self.assertFalse(self.ofc.create_ofc_port.call_count)
@ -505,6 +527,9 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
self.rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
p1_ref = self._show('ports', p1['id'])
self.assertEqual(p1_ref['port']['status'], 'ACTIVE')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
@ -646,6 +671,7 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
p1 = port['port']
net_id = port['port']['network_id']
res_id = net_id if resource == 'network' else p1['id']
self.assertEqual(p1['status'], 'DOWN')
net = self._show_resource('network', net_id)
@ -657,13 +683,15 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
self.rpcapi_update_ports(added=[portinfo])
self.assertFalse(self.ofc.create_ofc_port.call_count)
self._update_resource(resource, res_id,
{'admin_state_up': True})
res = self._update_resource(resource, res_id,
{'admin_state_up': True})
self.assertEqual(res['status'], 'ACTIVE')
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertFalse(self.ofc.delete_ofc_port.call_count)
self._update_resource(resource, res_id,
{'admin_state_up': False})
res = self._update_resource(resource, res_id,
{'admin_state_up': False})
self.assertEqual(res['status'], 'DOWN')
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
expected = [
@ -684,6 +712,92 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
]
self.ofc.assert_has_calls(expected)
def test_update_port_with_ofc_creation_failure(self):
with self.port(admin_state_up=False) as port:
port_id = port['port']['id']
portinfo = {'id': port_id, 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
self.ofc.set_raise_exc('create_ofc_port',
nexc.OFCException(reason='hoge'))
body = {'port': {'admin_state_up': True}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'ERROR')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ERROR')
body = {'port': {'admin_state_up': False}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'ERROR')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ERROR')
self.ofc.set_raise_exc('create_ofc_port', None)
body = {'port': {'admin_state_up': True}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'ACTIVE')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ACTIVE')
ctx = mock.ANY
port = mock.ANY
expected = [
mock.call.exists_ofc_port(ctx, port_id),
mock.call.create_ofc_port(ctx, port_id, port),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.create_ofc_port(ctx, port_id, port),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.delete_ofc_port(ctx, port_id, port),
]
self.ofc.assert_has_calls(expected)
self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
def test_update_port_with_ofc_deletion_failure(self):
with self.port() as port:
port_id = port['port']['id']
portinfo = {'id': port_id, 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
self.ofc.set_raise_exc('delete_ofc_port',
nexc.OFCException(reason='hoge'))
body = {'port': {'admin_state_up': False}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'ERROR')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ERROR')
body = {'port': {'admin_state_up': True}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'ERROR')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ERROR')
self.ofc.set_raise_exc('delete_ofc_port', None)
body = {'port': {'admin_state_up': False}}
res = self._update('ports', port_id, body)
self.assertEqual(res['port']['status'], 'DOWN')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'DOWN')
ctx = mock.ANY
port = mock.ANY
expected = [
mock.call.exists_ofc_port(ctx, port_id),
mock.call.create_ofc_port(ctx, port_id, port),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.delete_ofc_port(ctx, port_id, port),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.exists_ofc_port(ctx, port_id),
mock.call.delete_ofc_port(ctx, port_id, port),
]
self.ofc.assert_has_calls(expected)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 2)
def test_delete_port_with_ofc_deletion_failure(self):
self.ofc.set_raise_exc('delete_ofc_port',
nexc.OFCException(reason='hoge'))