Prevent L3 agent looping calls from hanging

This patch adopts several measures to prevent _sync_routers_task
and _rpc_loop from hanging because of subprocess.Popen.communicate
not returning.

1) Perform a sleep everytime a command is completed, similarly to
what is done in openstack.common.processutils.execute
2) Disable by default GARP, as kernel crashes caused by arping
have been observed
3) Prevent a non-critical keyerror in _router_removed from triggering
again a full sync, which might put the system under significant load.

This patch also adds debug log statements aimed at improving the
ability of debugging similar failures.

Change-Id: I003316bce0f38b7d2ea7d563b5a0a58676834398
Partial-Bug: 1224001
This commit is contained in:
Salvatore Orlando 2013-10-02 12:14:14 -07:00
parent 03436540ed
commit 2d69f32fb5
3 changed files with 39 additions and 21 deletions

View File

@ -163,10 +163,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
help=_("TCP Port used by Neutron metadata namespace " help=_("TCP Port used by Neutron metadata namespace "
"proxy.")), "proxy.")),
cfg.IntOpt('send_arp_for_ha', cfg.IntOpt('send_arp_for_ha',
default=3, default=0,
help=_("Send this many gratuitous ARPs for HA setup, " help=_("Send this many gratuitous ARPs for HA setup, if "
"set it below or equal to 0 to disable this " "less than or equal to 0, the feature is disabled")),
"feature.")),
cfg.BoolOpt('use_namespaces', default=True, cfg.BoolOpt('use_namespaces', default=True,
help=_("Allow overlapping IP.")), help=_("Allow overlapping IP.")),
cfg.StrOpt('router_id', default='', cfg.StrOpt('router_id', default='',
@ -308,7 +307,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self._spawn_metadata_proxy(ri) self._spawn_metadata_proxy(ri)
def _router_removed(self, router_id): def _router_removed(self, router_id):
ri = self.router_info[router_id] ri = self.router_info.get(router_id)
if ri is None:
LOG.warn(_("Info for router %s were not found. "
"Skipping router removal"), router_id)
return
ri.router['gw_port'] = None ri.router['gw_port'] = None
ri.router[l3_constants.INTERFACE_KEY] = [] ri.router[l3_constants.INTERFACE_KEY] = []
ri.router[l3_constants.FLOATINGIP_KEY] = [] ri.router[l3_constants.FLOATINGIP_KEY] = []
@ -706,6 +709,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
# so we can clear the value of updated_routers # so we can clear the value of updated_routers
# and removed_routers # and removed_routers
try: try:
LOG.debug(_("Starting RPC loop for %d updated routers"),
len(self.updated_routers))
if self.updated_routers: if self.updated_routers:
router_ids = list(self.updated_routers) router_ids = list(self.updated_routers)
self.updated_routers.clear() self.updated_routers.clear()
@ -713,6 +718,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self.context, router_ids) self.context, router_ids)
self._process_routers(routers) self._process_routers(routers)
self._process_router_delete() self._process_router_delete()
LOG.debug(_("RPC loop successfully completed"))
except Exception: except Exception:
LOG.exception(_("Failed synchronizing routers")) LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True self.fullsync = True
@ -732,6 +738,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
def _sync_routers_task(self, context): def _sync_routers_task(self, context):
if self.services_sync: if self.services_sync:
super(L3NATAgent, self).process_services_sync(context) super(L3NATAgent, self).process_services_sync(context)
LOG.debug(_("Starting _sync_routers_task - fullsync:%s"),
self.fullsync)
if not self.fullsync: if not self.fullsync:
return return
try: try:
@ -744,6 +752,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
LOG.debug(_('Processing :%r'), routers) LOG.debug(_('Processing :%r'), routers)
self._process_routers(routers, all_routers=True) self._process_routers(routers, all_routers=True)
self.fullsync = False self.fullsync = False
LOG.debug(_("_sync_routers_task successfully completed"))
except Exception: except Exception:
LOG.exception(_("Failed synchronizing routers")) LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True self.fullsync = True
@ -809,6 +818,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
self.heartbeat.start(interval=report_interval) self.heartbeat.start(interval=report_interval)
def _report_state(self): def _report_state(self):
LOG.debug(_("Report state task started"))
num_ex_gw_ports = 0 num_ex_gw_ports = 0
num_interfaces = 0 num_interfaces = 0
num_floating_ips = 0 num_floating_ips = 0
@ -832,6 +842,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
self.use_call) self.use_call)
self.agent_state.pop('start_flag', None) self.agent_state.pop('start_flag', None)
self.use_call = False self.use_call = False
LOG.debug(_("Report state task successfully completed"))
except AttributeError: except AttributeError:
# This means the server does not support report_state # This means the server does not support report_state
LOG.warn(_("Neutron server does not support state report." LOG.warn(_("Neutron server does not support state report."

View File

@ -25,6 +25,7 @@ import struct
import tempfile import tempfile
from eventlet.green import subprocess from eventlet.green import subprocess
from eventlet import greenthread
from neutron.common import utils from neutron.common import utils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -43,22 +44,27 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
env = os.environ.copy() env = os.environ.copy()
if addl_env: if addl_env:
env.update(addl_env) env.update(addl_env)
obj = utils.subprocess_popen(cmd, shell=False, try:
stdin=subprocess.PIPE, obj = utils.subprocess_popen(cmd, shell=False,
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, stdout=subprocess.PIPE,
env=env) stderr=subprocess.PIPE,
env=env)
_stdout, _stderr = (process_input and _stdout, _stderr = (process_input and
obj.communicate(process_input) or obj.communicate(process_input) or
obj.communicate()) obj.communicate())
obj.stdin.close() obj.stdin.close()
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdout: %(stdout)r\n" m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdout: %(stdout)r\n"
"Stderr: %(stderr)r") % {'cmd': cmd, 'code': obj.returncode, "Stderr: %(stderr)r") % {'cmd': cmd, 'code': obj.returncode,
'stdout': _stdout, 'stderr': _stderr} 'stdout': _stdout, 'stderr': _stderr}
LOG.debug(m) LOG.debug(m)
if obj.returncode and check_exit_code: if obj.returncode and check_exit_code:
raise RuntimeError(m) raise RuntimeError(m)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
return return_stderr and (_stdout, _stderr) or _stdout return return_stderr and (_stdout, _stderr) or _stdout

View File

@ -46,6 +46,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
self.conf.set_override('router_id', 'fake_id') self.conf.set_override('router_id', 'fake_id')
self.conf.set_override('interface_driver', self.conf.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver') 'neutron.agent.linux.interface.NullDriver')
self.conf.set_override('send_arp_for_ha', 1)
self.conf.root_helper = 'sudo' self.conf.root_helper = 'sudo'
self.device_exists_p = mock.patch( self.device_exists_p = mock.patch(