Implements ProcessMonitor to watch over external processes

This class takes care of all the spawned external processes,
taking the administrator configured action in the case of any
of the external processes die unexpectedly.

Implements: blueprint agent-child-processes-status

Change-Id: I6bc7a415dde5723dac07589859796c2ffeef5b54
This commit is contained in:
Miguel Angel Ajo 2014-08-18 12:59:32 +02:00
parent e3b4bfa8e4
commit 26039d2ff1
7 changed files with 423 additions and 11 deletions

View File

@ -13,21 +13,35 @@
# under the License.
#
# @author: Mark McClain, DreamHost
import collections
import eventlet
from oslo.config import cfg
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('external_pids',
default='$state_path/external/pids',
help=_('Location to store child pid files')),
cfg.BoolOpt('check_child_processes', default=False,
help=_("Periodically check child processes")),
cfg.StrOpt('check_child_processes_action', default='respawn',
choices=['respawn', 'exit'],
help=_('Action to be executed when a child process dies')),
cfg.IntOpt('check_child_processes_interval', default=60,
help=_('Interval between checks of child process liveness '
'(seconds)')),
]
cfg.CONF.register_opts(OPTS)
@ -37,23 +51,33 @@ class ProcessManager(object):
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo',
namespace=None, service=None, pids_path=None):
namespace=None, service=None, pids_path=None,
default_cmd_callback=None,
cmd_addl_env=None):
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
if service:
self.service_pid_fname = 'pid.' + service
else:
self.service_pid_fname = 'pid'
self.default_cmd_callback = default_cmd_callback
self.cmd_addl_env = cmd_addl_env
self.pids_path = pids_path or self.conf.external_pids
def enable(self, cmd_callback, reload_cfg=False):
if service:
self.service_pid_fname = 'pid.' + service
self.service = service
else:
self.service_pid_fname = 'pid'
self.service = 'default-service'
def enable(self, cmd_callback=None, reload_cfg=False):
if not self.active:
if not cmd_callback:
cmd_callback = self.default_cmd_callback
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd)
ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
elif reload_cfg:
self.reload_cfg()
@ -105,3 +129,130 @@ class ProcessManager(object):
return self.uuid in f.readline()
except IOError:
return False
ServiceId = collections.namedtuple('ServiceId', ['uuid', 'service'])
class ProcessMonitor(object):
def __init__(self, config, root_helper, resource_type, exit_handler):
"""Handle multiple process managers and watch over all of them.
:param config: oslo config object with the agent configuration.
:type config: oslo.config.ConfigOpts
:param root_helper: root helper to be used with new ProcessManagers
:type root_helper: str
:param resource_type: can be dhcp, router, load_balancer, etc.
:type resource_type: str
:param exit_handler: function to execute when agent exit has to
be executed, it should take care of actual
exit
:type exit_hanlder: function
"""
self._config = config
self._root_helper = root_helper
self._resource_type = resource_type
self._exit_handler = exit_handler
self._process_managers = {}
if self._config.check_child_processes:
self._spawn_checking_thread()
def enable(self, uuid, cmd_callback, namespace=None, service=None,
reload_cfg=False, cmd_addl_env=None):
"""Creates a process and ensures that it is monitored.
It will create a new ProcessManager and tie it to the uuid/service.
"""
process_manager = ProcessManager(conf=self._config,
uuid=uuid,
root_helper=self._root_helper,
namespace=namespace,
service=service,
default_cmd_callback=cmd_callback,
cmd_addl_env=cmd_addl_env)
process_manager.enable(reload_cfg=reload_cfg)
service_id = ServiceId(uuid, service)
self._process_managers[service_id] = process_manager
def disable(self, uuid, namespace=None, service=None):
"""Disables the process and stops monitoring it."""
service_id = ServiceId(uuid, service)
process_manager = self._process_managers.pop(service_id, None)
# we could be trying to disable a process_manager which was
# started on a separate run of this agent, or during netns-cleanup
# therefore we won't know about such uuid and we need to
# build the process_manager to kill it
if not process_manager:
process_manager = ProcessManager(conf=self._config,
uuid=uuid,
root_helper=self._root_helper,
namespace=namespace,
service=service)
process_manager.disable()
def disable_all(self):
for service_id in self._process_managers.keys():
self.disable(uuid=service_id.uuid, service=service_id.service)
def get_process_manager(self, uuid, service=None):
"""Returns a process manager for manipulation"""
service_id = ServiceId(uuid, service)
return self._process_managers.get(service_id)
def _get_process_manager_attribute(self, attribute, uuid, service=None):
process_manager = self.get_process_manager(uuid, service)
if process_manager:
return getattr(process_manager, attribute)
else:
return False
def is_active(self, uuid, service=None):
return self._get_process_manager_attribute('active', uuid, service)
def get_pid(self, uuid, service=None):
return self._get_process_manager_attribute('pid', uuid, service)
def _spawn_checking_thread(self):
eventlet.spawn(self._periodic_checking_thread)
@lockutils.synchronized("_check_child_processes")
def _check_child_processes(self):
for service_id in self._process_managers:
pm = self._process_managers.get(service_id)
if pm and not pm.active:
LOG.error(_LE("%(service)s for %(resource_type)s "
"with uuid %(uuid)s not found. "
"The process should not have died"),
{'service': pm.service,
'resource_type': self._resource_type,
'uuid': service_id.uuid})
self._execute_action(service_id)
eventlet.sleep(0)
def _periodic_checking_thread(self):
while True:
eventlet.sleep(self._config.check_child_processes_interval)
eventlet.spawn(self._check_child_processes)
def _execute_action(self, service_id):
action_function = getattr(
self, "_%s_action" % self._config.check_child_processes_action)
action_function(service_id)
def _respawn_action(self, service_id):
LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
{'service': service_id.service,
'uuid': service_id.uuid})
self._process_managers[service_id].enable()
def _exit_action(self, service_id):
LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
"actions"))
self._exit_handler(service_id.uuid, service_id.service)

View File

@ -0,0 +1,55 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
from neutron.agent.linux import daemon
def main():
class SimpleDaemon(daemon.Daemon):
"""The purpose of this daemon is to serve as an example, and also as
a dummy daemon, which can be invoked by functional testing, it
does nothing but setting the pid file, and staying detached in the
background.
"""
def run(self):
while True:
eventlet.sleep(10)
eventlet.monkey_patch()
opts = [
cfg.StrOpt('uuid',
help=_('uuid provided from the command line '
'so external_process can track us via /proc/'
'cmdline interface.'),
required=True),
cfg.StrOpt('pid_file',
help=_('Location of pid file of this process.'),
required=True)
]
cfg.CONF.register_cli_opts(opts)
# Don't get the default configuration file
cfg.CONF(project='neutron', default_config_files=[])
simple_daemon = SimpleDaemon(cfg.CONF.pid_file,
uuid=cfg.CONF.uuid)
simple_daemon.start()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,107 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
from six import moves
from neutron.agent.linux import external_process
from neutron.tests.functional.agent.linux import simple_daemon
from neutron.tests.functional import base
UUID_FORMAT = "test-uuid-%d"
class BaseTestProcessMonitor(base.BaseSudoTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self._exit_handler_called = False
cfg.CONF.set_override('check_child_processes', True)
cfg.CONF.set_override('check_child_processes_interval', 1)
self._child_processes = []
self._ext_processes = None
self.addCleanup(self.cleanup_spawned_children)
def create_child_processes_manager(self, action):
cfg.CONF.set_override('check_child_processes_action', action)
self._ext_processes = external_process.ProcessMonitor(
config=cfg.CONF,
root_helper=None,
resource_type='test',
exit_handler=self._exit_handler)
def _exit_handler(self, uuid, service):
self._exit_handler_called = True
self._exit_handler_params = (uuid, service)
def _make_cmdline_callback(self, uuid):
def _cmdline_callback(pidfile):
cmdline = ["python", simple_daemon.__file__,
"--uuid=%s" % uuid,
"--pid_file=%s" % pidfile]
return cmdline
return _cmdline_callback
def _spawn_n_children(self, n, service=None):
self._child_processes = []
for child_number in moves.xrange(n):
uuid = self._child_uuid(child_number)
_callback = self._make_cmdline_callback(uuid)
self._ext_processes.enable(uuid=uuid,
cmd_callback=_callback,
service=service)
pm = self._ext_processes.get_process_manager(uuid, service)
self._child_processes.append(pm)
@staticmethod
def _child_uuid(child_number):
return UUID_FORMAT % child_number
def _kill_last_child(self):
self._child_processes[-1].disable()
def spawn_child_processes_and_kill_last(self, service=None, number=2):
self._spawn_n_children(number, service)
self._kill_last_child()
self.assertFalse(self._child_processes[-1].active)
def wait_for_all_childs_respawned(self):
def all_childs_active():
return all(pm.active for pm in self._child_processes)
self._wait_for_condition(all_childs_active)
def _wait_for_condition(self, exit_condition, extra_time=5):
# we need to allow extra_time for the check process to happen
# and properly execute action over the gone processes under
# high load conditions
max_wait_time = cfg.CONF.check_child_processes_interval + extra_time
with self.assert_max_execution_time(max_wait_time):
while not exit_condition():
eventlet.sleep(0.01)
def cleanup_spawned_children(self):
if self._ext_processes:
self._ext_processes.disable_all()
class TestProcessMonitor(BaseTestProcessMonitor):
def test_respawn_handler(self):
self.create_child_processes_manager('respawn')
self.spawn_child_processes_and_kill_last()
self.wait_for_all_childs_respawned()

View File

@ -0,0 +1,99 @@
# Copyright 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from neutron.agent.linux import external_process
from neutron.tests import base
TEST_UUID = 'test-uuid'
TEST_SERVICE1 = 'testsvc'
TEST_PID = 1234
class BaseTestProcessMonitor(base.BaseTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self.pm_patch = mock.patch("neutron.agent.linux.external_process."
"ProcessManager", side_effect=mock.Mock)
self.pmanager = self.pm_patch.start()
self.log_patch = mock.patch("neutron.agent.linux.external_process."
"LOG.error")
self.error_log = self.log_patch.start()
self.spawn_patch = mock.patch("eventlet.spawn")
self.eventlent_spawn = self.spawn_patch.start()
# create a default process monitor
self.create_child_process_monitor('respawn')
def create_child_process_monitor(self, action):
self.exit_handler = mock.Mock()
conf = mock.Mock()
conf.check_child_processes_action = action
conf.check_child_processes = True
self.pmonitor = external_process.ProcessMonitor(
config=conf,
root_helper=None,
resource_type='test',
exit_handler=self.exit_handler)
def get_monitored_process_manager(self, uuid, service=None):
self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
return self.pmonitor.get_process_manager(uuid, service)
class TestProcessMonitor(BaseTestProcessMonitor):
def test_error_logged(self):
pm = self.get_monitored_process_manager(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.assertTrue(self.error_log.called)
def test_exit_handler(self):
self.create_child_process_monitor('exit')
pm = self.get_monitored_process_manager(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.exit_handler.assert_called_once_with(TEST_UUID, None)
def test_different_service_types(self):
pm_none = self.get_monitored_process_manager(TEST_UUID)
pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
self.assertNotEqual(pm_none, pm_svc1)
def test_active_method(self, service=None):
pm = self.get_monitored_process_manager(TEST_UUID, service)
pm.active = False
self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
pm.active = True
self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
def test_active_method_with_service(self):
self.test_active_method(TEST_SERVICE1)
def test_pid_method(self, service=None):
pm = self.get_monitored_process_manager(TEST_UUID, service)
pm.pid = TEST_PID
self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
def test_pid_method_with_service(self):
self.test_pid_method(TEST_PID)
def test_pid_method_unknown_uuid(self):
self.assertFalse(self.pmonitor.get_pid('bad-uuid'))

View File

@ -665,7 +665,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
mock.ANY,
'--debug',
('--log-file=neutron-ns-metadata-proxy-%s.log' %
fake_meta_network.id)])
fake_meta_network.id)], addl_env=None)
])
finally:
self.external_process_p.start()

View File

@ -2179,7 +2179,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
'--debug',
'--log-file=neutron-ns-metadata-proxy-%s.log' %
router_id
])
], addl_env=None)
])
finally:
self.external_process_p.start()

View File

@ -61,8 +61,8 @@ class TestProcessManager(base.BaseTestCase):
name.assert_called_once_with(ensure_pids_dir=True)
ip_lib.assert_has_calls([
mock.call.IPWrapper('sudo', 'ns'),
mock.call.IPWrapper().netns.execute(['the', 'cmd'])]
)
mock.call.IPWrapper().netns.execute(['the', 'cmd'],
addl_env=None)])
def test_enable_with_namespace_process_active(self):
callback = mock.Mock()