Merge "Reprogram flows when ovs-vswitchd restarts"
This commit is contained in:
commit
79ef08f3ca
@ -203,6 +203,13 @@ class OVSBridge(BaseOVS):
|
||||
else:
|
||||
self.run_ofctl("del-flows", [flow_expr_str])
|
||||
|
||||
def dump_flows_for_table(self, table):
|
||||
flow_str = "table=%s" % table
|
||||
flows = self.run_ofctl("dump-flows", [flow_str])
|
||||
retval = '\n'.join(item for item in flows.splitlines()
|
||||
if 'NXST' not in item)
|
||||
return retval
|
||||
|
||||
def defer_apply_on(self):
|
||||
LOG.debug(_('defer_apply_on'))
|
||||
self.defer_apply_flows = True
|
||||
|
@ -206,7 +206,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.updated_ports = set()
|
||||
self.setup_rpc()
|
||||
self.setup_integration_br()
|
||||
self.setup_physical_bridges(bridge_mappings)
|
||||
self.bridge_mappings = bridge_mappings
|
||||
self.setup_physical_bridges(self.bridge_mappings)
|
||||
self.local_vlan_map = {}
|
||||
self.tun_br_ofports = {p_const.TYPE_GRE: {},
|
||||
p_const.TYPE_VXLAN: {}}
|
||||
@ -223,6 +224,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.tunnel_count = 0
|
||||
self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
|
||||
self._check_ovs_version()
|
||||
self.tun_br = None
|
||||
if self.enable_tunneling:
|
||||
self.setup_tunnel_br(tun_br)
|
||||
# Collect additional bridges to monitor
|
||||
@ -541,16 +543,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
|
||||
'''
|
||||
|
||||
if not self.available_local_vlans:
|
||||
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
|
||||
return
|
||||
lvid = self.available_local_vlans.pop()
|
||||
# On a restart or crash of OVS, the network associated with this VLAN
|
||||
# will already be assigned, so check for that here before assigning a
|
||||
# new one.
|
||||
lvm = self.local_vlan_map.get(net_uuid)
|
||||
if lvm:
|
||||
lvid = lvm.vlan
|
||||
else:
|
||||
if not self.available_local_vlans:
|
||||
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
|
||||
return
|
||||
lvid = self.available_local_vlans.pop()
|
||||
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
|
||||
network_type,
|
||||
physical_network,
|
||||
segmentation_id)
|
||||
|
||||
LOG.info(_("Assigning %(vlan_id)s as local vlan for "
|
||||
"net-id=%(net_uuid)s"),
|
||||
{'vlan_id': lvid, 'net_uuid': net_uuid})
|
||||
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
|
||||
physical_network,
|
||||
segmentation_id)
|
||||
|
||||
if network_type in constants.TUNNEL_NETWORK_TYPES:
|
||||
if self.enable_tunneling:
|
||||
@ -683,7 +694,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.available_local_vlans.add(lvm.vlan)
|
||||
|
||||
def port_bound(self, port, net_uuid,
|
||||
network_type, physical_network, segmentation_id):
|
||||
network_type, physical_network, segmentation_id,
|
||||
ovs_restarted):
|
||||
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic
|
||||
to vm.
|
||||
|
||||
@ -692,8 +704,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
:param network_type: the network type ('gre', 'vlan', 'flat', 'local')
|
||||
:param physical_network: the physical network for 'vlan' or 'flat'
|
||||
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
|
||||
:param ovs_restarted: indicates if this is called for an OVS restart.
|
||||
'''
|
||||
if net_uuid not in self.local_vlan_map:
|
||||
if net_uuid not in self.local_vlan_map or ovs_restarted:
|
||||
self.provision_local_vlan(net_uuid, network_type,
|
||||
physical_network, segmentation_id)
|
||||
lvm = self.local_vlan_map[net_uuid]
|
||||
@ -754,6 +767,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.int_br.remove_all_flows()
|
||||
# switch all traffic using L2 learning
|
||||
self.int_br.add_flow(priority=1, actions="normal")
|
||||
# Add a canary flow to int_br to track OVS restarts
|
||||
self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0,
|
||||
actions="drop")
|
||||
|
||||
def setup_ancillary_bridges(self, integ_br, tun_br):
|
||||
'''Setup ancillary bridges - for example br-ex.'''
|
||||
@ -781,7 +797,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
ancillary_bridges.append(br)
|
||||
return ancillary_bridges
|
||||
|
||||
def setup_tunnel_br(self, tun_br):
|
||||
def setup_tunnel_br(self, tun_br=None):
|
||||
'''Setup the tunnel bridge.
|
||||
|
||||
Creates tunnel bridge, and links it to the integration bridge
|
||||
@ -789,7 +805,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
|
||||
:param tun_br: the name of the tunnel bridge.
|
||||
'''
|
||||
self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
|
||||
if not self.tun_br:
|
||||
self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
|
||||
|
||||
self.tun_br.reset_bridge()
|
||||
self.patch_tun_ofport = self.int_br.add_patch_port(
|
||||
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
|
||||
@ -1002,7 +1020,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
'removed': removed}
|
||||
|
||||
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
|
||||
physical_network, segmentation_id, admin_state_up):
|
||||
physical_network, segmentation_id, admin_state_up,
|
||||
ovs_restarted):
|
||||
# When this function is called for a port, the port should have
|
||||
# an OVS ofport configured, as only these ports were considered
|
||||
# for being treated. If that does not happen, it is a potential
|
||||
@ -1013,7 +1032,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
if vif_port:
|
||||
if admin_state_up:
|
||||
self.port_bound(vif_port, network_id, network_type,
|
||||
physical_network, segmentation_id)
|
||||
physical_network, segmentation_id,
|
||||
ovs_restarted)
|
||||
else:
|
||||
self.port_dead(vif_port)
|
||||
else:
|
||||
@ -1071,7 +1091,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.tun_br.delete_port(port_name)
|
||||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||
|
||||
def treat_devices_added_or_updated(self, devices):
|
||||
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
||||
resync = False
|
||||
for device in devices:
|
||||
LOG.debug(_("Processing port %s"), device)
|
||||
@ -1103,7 +1123,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
details['network_type'],
|
||||
details['physical_network'],
|
||||
details['segmentation_id'],
|
||||
details['admin_state_up'])
|
||||
details['admin_state_up'],
|
||||
ovs_restarted)
|
||||
# update plugin about port status
|
||||
if details.get('admin_state_up'):
|
||||
LOG.debug(_("Setting status for %s to UP"), device)
|
||||
@ -1180,7 +1201,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
LOG.debug(_("Device %s not defined on plugin"), device)
|
||||
return resync
|
||||
|
||||
def process_network_ports(self, port_info):
|
||||
def process_network_ports(self, port_info, ovs_restarted):
|
||||
resync_a = False
|
||||
resync_b = False
|
||||
# TODO(salv-orlando): consider a solution for ensuring notifications
|
||||
@ -1203,7 +1224,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
if devices_added_updated:
|
||||
start = time.time()
|
||||
resync_a = self.treat_devices_added_or_updated(
|
||||
devices_added_updated)
|
||||
devices_added_updated, ovs_restarted)
|
||||
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
|
||||
"treat_devices_added_or_updated completed "
|
||||
"in %(elapsed).3f"),
|
||||
@ -1291,6 +1312,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
port_info.get('removed') or
|
||||
port_info.get('updated'))
|
||||
|
||||
def check_ovs_restart(self):
|
||||
# Check for the canary flow
|
||||
canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE)
|
||||
return not canary_flow
|
||||
|
||||
def rpc_loop(self, polling_manager=None):
|
||||
if not polling_manager:
|
||||
polling_manager = polling.AlwaysPoll()
|
||||
@ -1300,6 +1326,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
updated_ports_copy = set()
|
||||
ancillary_ports = set()
|
||||
tunnel_sync = True
|
||||
ovs_restarted = False
|
||||
while True:
|
||||
start = time.time()
|
||||
port_stats = {'regular': {'added': 0,
|
||||
@ -1323,7 +1350,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
except Exception:
|
||||
LOG.exception(_("Error while synchronizing tunnels"))
|
||||
tunnel_sync = True
|
||||
if self._agent_has_updates(polling_manager):
|
||||
ovs_restarted = self.check_ovs_restart()
|
||||
if ovs_restarted:
|
||||
self.setup_integration_br()
|
||||
self.setup_physical_bridges(self.bridge_mappings)
|
||||
if self.enable_tunneling:
|
||||
self.setup_tunnel_br()
|
||||
if self._agent_has_updates(polling_manager) or ovs_restarted:
|
||||
try:
|
||||
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||
"starting polling. Elapsed:%(elapsed).3f"),
|
||||
@ -1335,7 +1368,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
# between these two statements, this will be thread-safe
|
||||
updated_ports_copy = self.updated_ports
|
||||
self.updated_ports = set()
|
||||
port_info = self.scan_ports(ports, updated_ports_copy)
|
||||
reg_ports = (set() if ovs_restarted else ports)
|
||||
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
||||
ports = port_info['current']
|
||||
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||
"port information retrieved. "
|
||||
@ -1345,11 +1379,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
# Secure and wire/unwire VIFs and update their status
|
||||
# on Neutron server
|
||||
if (self._port_info_has_changes(port_info) or
|
||||
self.sg_agent.firewall_refresh_needed()):
|
||||
self.sg_agent.firewall_refresh_needed() or
|
||||
ovs_restarted):
|
||||
LOG.debug(_("Starting to process devices in:%s"),
|
||||
port_info)
|
||||
# If treat devices fails - must resync with plugin
|
||||
sync = self.process_network_ports(port_info)
|
||||
sync = self.process_network_ports(port_info,
|
||||
ovs_restarted)
|
||||
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
|
||||
"ports processed. Elapsed:%(elapsed).3f"),
|
||||
{'iter_num': self.iter_num,
|
||||
|
@ -47,6 +47,7 @@ LEARN_FROM_TUN = 10
|
||||
UCAST_TO_TUN = 20
|
||||
ARP_RESPONDER = 21
|
||||
FLOOD_TO_TUN = 22
|
||||
CANARY_TABLE = 23
|
||||
|
||||
# Map tunnel types to tables number
|
||||
TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,
|
||||
|
@ -22,10 +22,12 @@ import netaddr
|
||||
from oslo.config import cfg
|
||||
import testtools
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ovs_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.openstack.common import log
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
|
||||
from neutron.plugins.openvswitch.common import constants
|
||||
@ -147,7 +149,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
'db_get_val', return_value=str(old_local_vlan)),
|
||||
mock.patch.object(self.agent.int_br, 'delete_flows')
|
||||
) as (set_ovs_db_func, get_ovs_db_func, delete_flows_func):
|
||||
self.agent.port_bound(port, net_uuid, 'local', None, None)
|
||||
self.agent.port_bound(port, net_uuid, 'local', None, None, False)
|
||||
get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
|
||||
if new_local_vlan != old_local_vlan:
|
||||
set_ovs_db_func.assert_called_once_with(
|
||||
@ -283,7 +285,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
side_effect=Exception()),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.Mock())):
|
||||
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
|
||||
self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
|
||||
False))
|
||||
|
||||
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
||||
"""Mock treat devices added or updated.
|
||||
@ -302,7 +305,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, func_name)
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
|
||||
False))
|
||||
return func.called
|
||||
|
||||
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
||||
@ -349,7 +353,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
mock.patch.object(self.agent, 'treat_vif_port')
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||
upd_dev_down, treat_vif_port):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
|
||||
False))
|
||||
self.assertTrue(treat_vif_port.called)
|
||||
self.assertTrue(upd_dev_down.called)
|
||||
|
||||
@ -380,11 +385,12 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
mock.patch.object(self.agent, "treat_devices_removed",
|
||||
return_value=False)
|
||||
) as (setup_port_filters, device_added_updated, device_removed):
|
||||
self.assertFalse(self.agent.process_network_ports(port_info))
|
||||
self.assertFalse(self.agent.process_network_ports(port_info,
|
||||
False))
|
||||
setup_port_filters.assert_called_once_with(
|
||||
port_info['added'], port_info.get('updated', set()))
|
||||
device_added_updated.assert_called_once_with(
|
||||
port_info['added'] | port_info.get('updated', set()))
|
||||
port_info['added'] | port_info.get('updated', set()), False)
|
||||
device_removed.assert_called_once_with(port_info['removed'])
|
||||
|
||||
def test_process_network_ports(self):
|
||||
@ -800,6 +806,61 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
|
||||
self.agent.setup_tunnel_port.assert_has_calls(expected_calls)
|
||||
|
||||
def test_ovs_restart(self):
|
||||
reply2 = {'current': set(['tap0']),
|
||||
'added': set(['tap2']),
|
||||
'removed': set([])}
|
||||
|
||||
reply3 = {'current': set(['tap2']),
|
||||
'added': set([]),
|
||||
'removed': set(['tap0'])}
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(async_process.AsyncProcess, "_spawn"),
|
||||
mock.patch.object(log.ContextAdapter, 'exception'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
'scan_ports'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
'process_network_ports'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
'check_ovs_restart'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
'setup_integration_br'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
'setup_physical_bridges')
|
||||
) as (spawn_fn, log_exception, scan_ports, process_network_ports,
|
||||
check_ovs_restart, setup_int_br, setup_phys_br):
|
||||
log_exception.side_effect = Exception(
|
||||
'Fake exception to get out of the loop')
|
||||
scan_ports.side_effect = [reply2, reply3]
|
||||
process_network_ports.side_effect = [
|
||||
False, Exception('Fake exception to get out of the loop')]
|
||||
check_ovs_restart.side_effect = [False, True]
|
||||
|
||||
# This will exit after the second loop
|
||||
try:
|
||||
self.agent.daemon_loop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
scan_ports.assert_has_calls([
|
||||
mock.call(set(), set()),
|
||||
mock.call(set(), set())
|
||||
])
|
||||
process_network_ports.assert_has_calls([
|
||||
mock.call({'current': set(['tap0']),
|
||||
'removed': set([]),
|
||||
'added': set(['tap2'])}, False),
|
||||
mock.call({'current': set(['tap2']),
|
||||
'removed': set(['tap0']),
|
||||
'added': set([])}, True)
|
||||
])
|
||||
|
||||
# Verify the second time through the loop we triggered an
|
||||
# OVS restart and re-setup the bridges
|
||||
setup_int_br.assert_has_calls([mock.call()])
|
||||
setup_phys_br.assert_has_calls([mock.call({})])
|
||||
|
||||
|
||||
class AncillaryBridgesTest(base.BaseTestCase):
|
||||
|
||||
|
@ -111,6 +111,8 @@ class TunnelTest(base.BaseTestCase):
|
||||
mock.call.delete_port('patch-tun'),
|
||||
mock.call.remove_all_flows(),
|
||||
mock.call.add_flow(priority=1, actions='normal'),
|
||||
mock.call.add_flow(priority=0, table=constants.CANARY_TABLE,
|
||||
actions='drop')
|
||||
]
|
||||
|
||||
self.mock_map_tun_bridge = self.ovs_bridges[self.MAP_TUN_BRIDGE]
|
||||
@ -476,7 +478,7 @@ class TunnelTest(base.BaseTestCase):
|
||||
'sudo', 2, ['gre'],
|
||||
self.VETH_MTU)
|
||||
a.local_vlan_map[NET_UUID] = LVM
|
||||
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
|
||||
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID, False)
|
||||
self._verify_mock_calls()
|
||||
|
||||
def test_port_unbound(self):
|
||||
@ -552,6 +554,11 @@ class TunnelTest(base.BaseTestCase):
|
||||
'added': set([]),
|
||||
'removed': set(['tap0'])}
|
||||
|
||||
self.mock_int_bridge_expected += [
|
||||
mock.call.dump_flows_for_table(constants.CANARY_TABLE),
|
||||
mock.call.dump_flows_for_table(constants.CANARY_TABLE)
|
||||
]
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(log.ContextAdapter, 'exception'),
|
||||
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
|
||||
@ -590,10 +597,10 @@ class TunnelTest(base.BaseTestCase):
|
||||
process_network_ports.assert_has_calls([
|
||||
mock.call({'current': set(['tap0']),
|
||||
'removed': set([]),
|
||||
'added': set(['tap2'])}),
|
||||
'added': set(['tap2'])}, False),
|
||||
mock.call({'current': set(['tap2']),
|
||||
'removed': set(['tap0']),
|
||||
'added': set([])})
|
||||
'added': set([])}, False)
|
||||
])
|
||||
self._verify_mock_calls()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user