Merge "ofagent: Stop monitoring ovsdb for port changes"
This commit is contained in:
commit
867728c1b8
@ -33,7 +33,6 @@ from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
|
|||||||
from neutron.agent import l2population_rpc
|
from neutron.agent import l2population_rpc
|
||||||
from neutron.agent.linux import ip_lib
|
from neutron.agent.linux import ip_lib
|
||||||
from neutron.agent.linux import ovs_lib
|
from neutron.agent.linux import ovs_lib
|
||||||
from neutron.agent.linux import polling
|
|
||||||
from neutron.agent.linux import utils
|
from neutron.agent.linux import utils
|
||||||
from neutron.agent import rpc as agent_rpc
|
from neutron.agent import rpc as agent_rpc
|
||||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||||
@ -200,9 +199,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
|||||||
def __init__(self, ryuapp, integ_br, tun_br, local_ip,
|
def __init__(self, ryuapp, integ_br, tun_br, local_ip,
|
||||||
bridge_mappings, root_helper,
|
bridge_mappings, root_helper,
|
||||||
polling_interval, tunnel_types=None,
|
polling_interval, tunnel_types=None,
|
||||||
veth_mtu=None, minimize_polling=False,
|
veth_mtu=None):
|
||||||
ovsdb_monitor_respawn_interval=(
|
|
||||||
constants.DEFAULT_OVSDBMON_RESPAWN)):
|
|
||||||
"""Constructor.
|
"""Constructor.
|
||||||
|
|
||||||
:param ryuapp: object of the ryu app.
|
:param ryuapp: object of the ryu app.
|
||||||
@ -216,11 +213,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
|||||||
the agent. If set, will automatically set enable_tunneling to
|
the agent. If set, will automatically set enable_tunneling to
|
||||||
True.
|
True.
|
||||||
:param veth_mtu: MTU size for veth interfaces.
|
:param veth_mtu: MTU size for veth interfaces.
|
||||||
:param minimize_polling: Optional, whether to minimize polling by
|
|
||||||
monitoring ovsdb for interface changes.
|
|
||||||
:param ovsdb_monitor_respawn_interval: Optional, when using polling
|
|
||||||
minimization, the number of seconds to wait before respawning
|
|
||||||
the ovsdb monitor.
|
|
||||||
"""
|
"""
|
||||||
super(OFANeutronAgent, self).__init__()
|
super(OFANeutronAgent, self).__init__()
|
||||||
self.ryuapp = ryuapp
|
self.ryuapp = ryuapp
|
||||||
@ -254,8 +246,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
|||||||
p_const.TYPE_VXLAN: {}}
|
p_const.TYPE_VXLAN: {}}
|
||||||
|
|
||||||
self.polling_interval = polling_interval
|
self.polling_interval = polling_interval
|
||||||
self.minimize_polling = minimize_polling
|
|
||||||
self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
|
|
||||||
|
|
||||||
self.enable_tunneling = bool(self.tunnel_types)
|
self.enable_tunneling = bool(self.tunnel_types)
|
||||||
self.local_ip = local_ip
|
self.local_ip = local_ip
|
||||||
@ -1260,35 +1250,27 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
|||||||
resync = True
|
resync = True
|
||||||
return resync
|
return resync
|
||||||
|
|
||||||
def _agent_has_updates(self, polling_manager):
|
|
||||||
return (polling_manager.is_polling_required or
|
|
||||||
self.updated_ports or
|
|
||||||
self.sg_agent.firewall_refresh_needed())
|
|
||||||
|
|
||||||
def _port_info_has_changes(self, port_info):
|
def _port_info_has_changes(self, port_info):
|
||||||
return (port_info.get('added') or
|
return (port_info.get('added') or
|
||||||
port_info.get('removed') or
|
port_info.get('removed') or
|
||||||
port_info.get('updated'))
|
port_info.get('updated'))
|
||||||
|
|
||||||
def ovsdb_monitor_loop(self, polling_manager=None):
|
def daemon_loop(self):
|
||||||
if not polling_manager:
|
# TODO(yamamoto):
|
||||||
polling_manager = polling.AlwaysPoll()
|
# It might be better to monitor port status async messages
|
||||||
|
|
||||||
sync = True
|
sync = True
|
||||||
ports = set()
|
ports = set()
|
||||||
updated_ports_copy = set()
|
|
||||||
tunnel_sync = True
|
tunnel_sync = True
|
||||||
while True:
|
while True:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
|
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
|
||||||
LOG.debug(_("Agent ovsdb_monitor_loop - "
|
LOG.debug("Agent daemon_loop - iteration:%d started",
|
||||||
"iteration:%d started"),
|
|
||||||
self.iter_num)
|
self.iter_num)
|
||||||
if sync:
|
if sync:
|
||||||
LOG.info(_("Agent out of sync with plugin!"))
|
LOG.info(_("Agent out of sync with plugin!"))
|
||||||
ports.clear()
|
ports.clear()
|
||||||
sync = False
|
sync = False
|
||||||
polling_manager.force_polling()
|
|
||||||
# Notify the plugin of tunnel IP
|
# Notify the plugin of tunnel IP
|
||||||
if self.enable_tunneling and tunnel_sync:
|
if self.enable_tunneling and tunnel_sync:
|
||||||
LOG.info(_("Agent tunnel out of sync with plugin!"))
|
LOG.info(_("Agent tunnel out of sync with plugin!"))
|
||||||
@ -1297,82 +1279,66 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
|||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Error while synchronizing tunnels"))
|
LOG.exception(_("Error while synchronizing tunnels"))
|
||||||
tunnel_sync = True
|
tunnel_sync = True
|
||||||
if self._agent_has_updates(polling_manager):
|
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
|
||||||
try:
|
"starting polling. Elapsed:%(elapsed).3f",
|
||||||
LOG.debug(_("Agent ovsdb_monitor_loop - "
|
{'iter_num': self.iter_num,
|
||||||
"iteration:%(iter_num)d - "
|
'elapsed': time.time() - start})
|
||||||
"starting polling. Elapsed:%(elapsed).3f"),
|
try:
|
||||||
|
# Save updated ports dict to perform rollback in
|
||||||
|
# case resync would be needed, and then clear
|
||||||
|
# self.updated_ports. As the greenthread should not yield
|
||||||
|
# between these two statements, this will be thread-safe
|
||||||
|
updated_ports_copy = self.updated_ports
|
||||||
|
self.updated_ports = set()
|
||||||
|
port_info = self.scan_ports(ports, updated_ports_copy)
|
||||||
|
ports = port_info['current']
|
||||||
|
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
|
||||||
|
"port information retrieved. "
|
||||||
|
"Elapsed:%(elapsed).3f",
|
||||||
|
{'iter_num': self.iter_num,
|
||||||
|
'elapsed': time.time() - start})
|
||||||
|
# Secure and wire/unwire VIFs and update their status
|
||||||
|
# on Neutron server
|
||||||
|
if (self._port_info_has_changes(port_info) or
|
||||||
|
self.sg_agent.firewall_refresh_needed()):
|
||||||
|
LOG.debug("Starting to process devices in:%s",
|
||||||
|
port_info)
|
||||||
|
# If treat devices fails - must resync with plugin
|
||||||
|
sync = self.process_network_ports(port_info)
|
||||||
|
LOG.debug("Agent daemon_loop - "
|
||||||
|
"iteration:%(iter_num)d - "
|
||||||
|
"ports processed. Elapsed:%(elapsed).3f",
|
||||||
{'iter_num': self.iter_num,
|
{'iter_num': self.iter_num,
|
||||||
'elapsed': time.time() - start})
|
'elapsed': time.time() - start})
|
||||||
# Save updated ports dict to perform rollback in
|
port_stats['regular']['added'] = (
|
||||||
# case resync would be needed, and then clear
|
len(port_info.get('added', [])))
|
||||||
# self.updated_ports. As the greenthread should not yield
|
port_stats['regular']['updated'] = (
|
||||||
# between these two statements, this will be thread-safe
|
len(port_info.get('updated', [])))
|
||||||
updated_ports_copy = self.updated_ports
|
port_stats['regular']['removed'] = (
|
||||||
self.updated_ports = set()
|
len(port_info.get('removed', [])))
|
||||||
port_info = self.scan_ports(ports, updated_ports_copy)
|
except Exception:
|
||||||
ports = port_info['current']
|
LOG.exception(_("Error while processing VIF ports"))
|
||||||
LOG.debug(_("Agent ovsdb_monitor_loop - "
|
# Put the ports back in self.updated_port
|
||||||
"iteration:%(iter_num)d - "
|
self.updated_ports |= updated_ports_copy
|
||||||
"port information retrieved. "
|
sync = True
|
||||||
"Elapsed:%(elapsed).3f"),
|
|
||||||
{'iter_num': self.iter_num,
|
|
||||||
'elapsed': time.time() - start})
|
|
||||||
# Secure and wire/unwire VIFs and update their status
|
|
||||||
# on Neutron server
|
|
||||||
if (self._port_info_has_changes(port_info) or
|
|
||||||
self.sg_agent.firewall_refresh_needed()):
|
|
||||||
LOG.debug(_("Starting to process devices in:%s"),
|
|
||||||
port_info)
|
|
||||||
# If treat devices fails - must resync with plugin
|
|
||||||
sync = self.process_network_ports(port_info)
|
|
||||||
LOG.debug(_("Agent ovsdb_monitor_loop - "
|
|
||||||
"iteration:%(iter_num)d - "
|
|
||||||
"ports processed. Elapsed:%(elapsed).3f"),
|
|
||||||
{'iter_num': self.iter_num,
|
|
||||||
'elapsed': time.time() - start})
|
|
||||||
port_stats['regular']['added'] = (
|
|
||||||
len(port_info.get('added', [])))
|
|
||||||
port_stats['regular']['updated'] = (
|
|
||||||
len(port_info.get('updated', [])))
|
|
||||||
port_stats['regular']['removed'] = (
|
|
||||||
len(port_info.get('removed', [])))
|
|
||||||
|
|
||||||
polling_manager.polling_completed()
|
|
||||||
except Exception:
|
|
||||||
LOG.exception(_("Error while processing VIF ports"))
|
|
||||||
# Put the ports back in self.updated_port
|
|
||||||
self.updated_ports |= updated_ports_copy
|
|
||||||
sync = True
|
|
||||||
|
|
||||||
# sleep till end of polling interval
|
# sleep till end of polling interval
|
||||||
elapsed = (time.time() - start)
|
elapsed = (time.time() - start)
|
||||||
LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
|
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d "
|
||||||
"completed. Processed ports statistics:"
|
"completed. Processed ports statistics:"
|
||||||
"%(port_stats)s. Elapsed:%(elapsed).3f"),
|
"%(port_stats)s. Elapsed:%(elapsed).3f",
|
||||||
{'iter_num': self.iter_num,
|
{'iter_num': self.iter_num,
|
||||||
'port_stats': port_stats,
|
'port_stats': port_stats,
|
||||||
'elapsed': elapsed})
|
'elapsed': elapsed})
|
||||||
if (elapsed < self.polling_interval):
|
if (elapsed < self.polling_interval):
|
||||||
time.sleep(self.polling_interval - elapsed)
|
time.sleep(self.polling_interval - elapsed)
|
||||||
else:
|
else:
|
||||||
LOG.debug(_("Loop iteration exceeded interval "
|
LOG.debug("Loop iteration exceeded interval "
|
||||||
"(%(polling_interval)s vs. %(elapsed)s)!"),
|
"(%(polling_interval)s vs. %(elapsed)s)!",
|
||||||
{'polling_interval': self.polling_interval,
|
{'polling_interval': self.polling_interval,
|
||||||
'elapsed': elapsed})
|
'elapsed': elapsed})
|
||||||
self.iter_num = self.iter_num + 1
|
self.iter_num = self.iter_num + 1
|
||||||
|
|
||||||
def daemon_loop(self):
|
|
||||||
# TODO(yamamoto): make polling logic stop using ovsdb monitor
|
|
||||||
# - make it a dumb periodic polling
|
|
||||||
# - or, monitor port status async messages
|
|
||||||
with polling.get_polling_manager(
|
|
||||||
self.minimize_polling,
|
|
||||||
self.root_helper,
|
|
||||||
self.ovsdb_monitor_respawn_interval) as pm:
|
|
||||||
|
|
||||||
self.ovsdb_monitor_loop(polling_manager=pm)
|
|
||||||
|
|
||||||
|
|
||||||
def create_agent_config_map(config):
|
def create_agent_config_map(config):
|
||||||
"""Create a map of agent config parameters.
|
"""Create a map of agent config parameters.
|
||||||
@ -1392,10 +1358,8 @@ def create_agent_config_map(config):
|
|||||||
bridge_mappings=bridge_mappings,
|
bridge_mappings=bridge_mappings,
|
||||||
root_helper=config.AGENT.root_helper,
|
root_helper=config.AGENT.root_helper,
|
||||||
polling_interval=config.AGENT.polling_interval,
|
polling_interval=config.AGENT.polling_interval,
|
||||||
minimize_polling=config.AGENT.minimize_polling,
|
|
||||||
tunnel_types=config.AGENT.tunnel_types,
|
tunnel_types=config.AGENT.tunnel_types,
|
||||||
veth_mtu=config.AGENT.veth_mtu,
|
veth_mtu=config.AGENT.veth_mtu,
|
||||||
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# If enable_tunneling is TRUE, set tunnel_type to default to GRE
|
# If enable_tunneling is TRUE, set tunnel_type to default to GRE
|
||||||
|
@ -808,22 +808,6 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
|
|||||||
self.agent.reclaim_local_vlan(self.lvms[1].net)
|
self.agent.reclaim_local_vlan(self.lvms[1].net)
|
||||||
del_port_fn.assert_called_once_with(self.tun_name2)
|
del_port_fn.assert_called_once_with(self.tun_name2)
|
||||||
|
|
||||||
def test_daemon_loop_uses_polling_manager(self):
|
|
||||||
with mock.patch(
|
|
||||||
'neutron.agent.linux.polling.get_polling_manager'
|
|
||||||
) as mock_get_pm:
|
|
||||||
fake_pm = mock.Mock()
|
|
||||||
mock_get_pm.return_value = fake_pm
|
|
||||||
fake_pm.__enter__ = mock.Mock()
|
|
||||||
fake_pm.__exit__ = mock.Mock()
|
|
||||||
with mock.patch.object(
|
|
||||||
self.agent, 'ovsdb_monitor_loop'
|
|
||||||
) as mock_loop:
|
|
||||||
self.agent.daemon_loop()
|
|
||||||
mock_get_pm.assert_called_once_with(True, 'fake_helper',
|
|
||||||
constants.DEFAULT_OVSDBMON_RESPAWN)
|
|
||||||
mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
|
|
||||||
|
|
||||||
def test__setup_tunnel_port_error_negative(self):
|
def test__setup_tunnel_port_error_negative(self):
|
||||||
with contextlib.nested(
|
with contextlib.nested(
|
||||||
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
|
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user