change port status only if port is bound to the good host

if host is set in the rpc message update_device_up/down sent by the agent,
the port status will be changed only if the port is bound to the host.

Change-Id: I0e607c734fbebf0b69f83c3bbd3e25a9783672dc
Closes-Bug: #1224967
This commit is contained in:
mathieu-rohon 2013-10-02 14:13:36 +02:00
parent 26290eae23
commit 250f51b903
16 changed files with 135 additions and 47 deletions

View File

@ -94,16 +94,16 @@ class PluginApi(proxy.RpcProxy):
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id):
def update_device_down(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id),
agent_id=agent_id, host=host),
topic=self.topic)
def update_device_up(self, context, device, agent_id):
def update_device_up(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_up', device=device,
agent_id=agent_id),
agent_id=agent_id, host=host),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):

View File

@ -319,7 +319,8 @@ class HyperVNeutronAgent(object):
try:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(
_("Removing port failed for device %(device)s: %(e)s"),

View File

@ -653,11 +653,13 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
tap_device_name,
self.agent.agent_id)
self.agent.agent_id,
cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
tap_device_name,
self.agent.agent_id)
self.agent.agent_id,
cfg.CONF.host)
else:
bridge_name = self.agent.br_mgr.get_bridge_name(
port['network_id'])
@ -666,7 +668,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status
self.agent.plugin_rpc.update_device_down(self.context,
tap_device_name,
self.agent.agent_id)
self.agent.agent_id,
cfg.CONF.host)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
@ -855,11 +858,13 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
else:
self.remove_port_binding(details['network_id'],
details['port_id'])
@ -875,7 +880,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})

View File

@ -41,6 +41,7 @@ from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
@ -116,13 +117,20 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device)
plugin = manager.NeutronManager.get_plugin()
if port:
entry = {'device': device,
'exists': True}
if port['status'] != q_const.PORT_STATUS_DOWN:
if (host and not
plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else:
@ -135,13 +143,21 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s up %(agent_id)s"),
host = kwargs.get('host')
port = self.get_port_from_device.get_port(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device)
plugin = manager.NeutronManager.get_plugin()
if port:
if port['status'] != q_const.PORT_STATUS_ACTIVE:
# Set port status to ACTIVE
db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
elif port['status'] != q_const.PORT_STATUS_ACTIVE:
db.set_port_status(port['id'],
q_const.PORT_STATUS_ACTIVE)
else:
LOG.debug(_("%s can not be found in database"), device)

View File

@ -119,3 +119,17 @@ def get_port_and_sgs(port_id):
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
def get_port_binding_host(port_id):
session = db_api.get_session()
with session.begin(subtransactions=True):
try:
query = (session.query(models.PortBinding).
filter(models.PortBinding.port_id.startswith(port_id)).
one())
except exc.NoResultFound:
LOG.debug(_("No binding found for port %(port_id)s"),
{'port_id': port_id})
return
return query.host

View File

@ -591,7 +591,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.warning(_("Port %(port)s updated up by agent not found"),
{'port': port_id})
return False
if port.status != status:
original_port = self._make_port_dict(port)
port.status = status
@ -608,3 +607,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.update_port_postcommit(mech_context)
return True
def port_bound_to_host(self, port_id, host):
port_host = db.get_port_binding_host(port_id)
return (port_host == host)

View File

@ -153,12 +153,20 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
LOG.debug(_("Device %(device)s no longer exists at agent "
"%(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
port_exists = True
if (host and not plugin.port_bound_to_host(port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return {'device': device,
'exists': port_exists}
port_exists = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_DOWN)
@ -169,11 +177,17 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
if (host and not plugin.port_bound_to_host(port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE)

View File

@ -297,11 +297,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if port['admin_state_up']:
# update plugin about port status
self.plugin_rpc.update_device_up(self.context, port['id'],
self.agent_id)
self.agent_id,
cfg.CONF.host)
else:
# update plugin about port status
self.plugin_rpc.update_device_down(self.context, port['id'],
self.agent_id)
self.agent_id,
cfg.CONF.host)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
@ -910,7 +912,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
@ -934,7 +937,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
return resync
def treat_devices_removed(self, devices):
@ -945,7 +949,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
@ -966,7 +971,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})

View File

@ -52,6 +52,7 @@ from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
@ -123,18 +124,25 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = ovs_db_v2.get_port(device)
if port:
entry = {'device': device,
'exists': True}
if port['status'] != q_const.PORT_STATUS_DOWN:
plugin = manager.NeutronManager.get_plugin()
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN
ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
ovs_db_v2.set_port_status(port['id'],
q_const.PORT_STATUS_DOWN)
else:
entry = {'device': device,
'exists': False}
@ -145,11 +153,19 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = ovs_db_v2.get_port(device)
plugin = manager.NeutronManager.get_plugin()
if port:
if port['status'] != q_const.PORT_STATUS_ACTIVE:
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
elif port['status'] != q_const.PORT_STATUS_ACTIVE:
ovs_db_v2.set_port_status(port['id'],
q_const.PORT_STATUS_ACTIVE)
else:

View File

@ -116,7 +116,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -753,7 +753,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
rpc_obj.update_device_down.assert_called_with(
self.lb_rpc.context,
"tap123",
self.lb_rpc.agent.agent_id
self.lb_rpc.agent.agent_id,
cfg.CONF.host
)
def test_port_update_plugin_rpc_failed(self):

View File

@ -118,11 +118,13 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')

View File

@ -93,7 +93,8 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@ -107,4 +108,5 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')

View File

@ -141,11 +141,13 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')

View File

@ -288,7 +288,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
"124", "vlan", "physnet",
"1", False)
upddown_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id)
"123", self.agent.agent_id,
cfg.CONF.host)
port["admin_state_up"] = True
self.agent.port_update("unused_context",
@ -297,7 +298,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
segmentation_id="1",
physical_network="physnet")
updup_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id)
"123", self.agent.agent_id,
cfg.CONF.host)
def test_port_update_plugin_rpc_failed(self):
port = {'id': 1,

View File

@ -102,7 +102,8 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@ -116,4 +117,5 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')