Merge "Implements ProcessMonitor to watch over external processes"

This commit is contained in:
Jenkins 2014-09-03 16:27:24 +00:00 committed by Gerrit Code Review
commit 8398ffc509
7 changed files with 423 additions and 11 deletions

View File

@ -13,21 +13,35 @@
# under the License.
#
# @author: Mark McClain, DreamHost
import collections
import eventlet
from oslo.config import cfg
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('external_pids',
default='$state_path/external/pids',
help=_('Location to store child pid files')),
cfg.BoolOpt('check_child_processes', default=False,
help=_("Periodically check child processes")),
cfg.StrOpt('check_child_processes_action', default='respawn',
choices=['respawn', 'exit'],
help=_('Action to be executed when a child process dies')),
cfg.IntOpt('check_child_processes_interval', default=60,
help=_('Interval between checks of child process liveness '
'(seconds)')),
]
cfg.CONF.register_opts(OPTS)
@ -37,23 +51,33 @@ class ProcessManager(object):
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo',
namespace=None, service=None, pids_path=None):
namespace=None, service=None, pids_path=None,
default_cmd_callback=None,
cmd_addl_env=None):
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
if service:
self.service_pid_fname = 'pid.' + service
else:
self.service_pid_fname = 'pid'
self.default_cmd_callback = default_cmd_callback
self.cmd_addl_env = cmd_addl_env
self.pids_path = pids_path or self.conf.external_pids
def enable(self, cmd_callback, reload_cfg=False):
if service:
self.service_pid_fname = 'pid.' + service
self.service = service
else:
self.service_pid_fname = 'pid'
self.service = 'default-service'
def enable(self, cmd_callback=None, reload_cfg=False):
if not self.active:
if not cmd_callback:
cmd_callback = self.default_cmd_callback
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd)
ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
elif reload_cfg:
self.reload_cfg()
@ -105,3 +129,130 @@ class ProcessManager(object):
return self.uuid in f.readline()
except IOError:
return False
ServiceId = collections.namedtuple('ServiceId', ['uuid', 'service'])
class ProcessMonitor(object):
def __init__(self, config, root_helper, resource_type, exit_handler):
"""Handle multiple process managers and watch over all of them.
:param config: oslo config object with the agent configuration.
:type config: oslo.config.ConfigOpts
:param root_helper: root helper to be used with new ProcessManagers
:type root_helper: str
:param resource_type: can be dhcp, router, load_balancer, etc.
:type resource_type: str
:param exit_handler: function to execute when agent exit has to
be executed, it should take care of actual
exit
:type exit_hanlder: function
"""
self._config = config
self._root_helper = root_helper
self._resource_type = resource_type
self._exit_handler = exit_handler
self._process_managers = {}
if self._config.check_child_processes:
self._spawn_checking_thread()
def enable(self, uuid, cmd_callback, namespace=None, service=None,
reload_cfg=False, cmd_addl_env=None):
"""Creates a process and ensures that it is monitored.
It will create a new ProcessManager and tie it to the uuid/service.
"""
process_manager = ProcessManager(conf=self._config,
uuid=uuid,
root_helper=self._root_helper,
namespace=namespace,
service=service,
default_cmd_callback=cmd_callback,
cmd_addl_env=cmd_addl_env)
process_manager.enable(reload_cfg=reload_cfg)
service_id = ServiceId(uuid, service)
self._process_managers[service_id] = process_manager
def disable(self, uuid, namespace=None, service=None):
"""Disables the process and stops monitoring it."""
service_id = ServiceId(uuid, service)
process_manager = self._process_managers.pop(service_id, None)
# we could be trying to disable a process_manager which was
# started on a separate run of this agent, or during netns-cleanup
# therefore we won't know about such uuid and we need to
# build the process_manager to kill it
if not process_manager:
process_manager = ProcessManager(conf=self._config,
uuid=uuid,
root_helper=self._root_helper,
namespace=namespace,
service=service)
process_manager.disable()
def disable_all(self):
for service_id in self._process_managers.keys():
self.disable(uuid=service_id.uuid, service=service_id.service)
def get_process_manager(self, uuid, service=None):
"""Returns a process manager for manipulation"""
service_id = ServiceId(uuid, service)
return self._process_managers.get(service_id)
def _get_process_manager_attribute(self, attribute, uuid, service=None):
process_manager = self.get_process_manager(uuid, service)
if process_manager:
return getattr(process_manager, attribute)
else:
return False
def is_active(self, uuid, service=None):
return self._get_process_manager_attribute('active', uuid, service)
def get_pid(self, uuid, service=None):
return self._get_process_manager_attribute('pid', uuid, service)
def _spawn_checking_thread(self):
eventlet.spawn(self._periodic_checking_thread)
@lockutils.synchronized("_check_child_processes")
def _check_child_processes(self):
for service_id in self._process_managers:
pm = self._process_managers.get(service_id)
if pm and not pm.active:
LOG.error(_LE("%(service)s for %(resource_type)s "
"with uuid %(uuid)s not found. "
"The process should not have died"),
{'service': pm.service,
'resource_type': self._resource_type,
'uuid': service_id.uuid})
self._execute_action(service_id)
eventlet.sleep(0)
def _periodic_checking_thread(self):
while True:
eventlet.sleep(self._config.check_child_processes_interval)
eventlet.spawn(self._check_child_processes)
def _execute_action(self, service_id):
action_function = getattr(
self, "_%s_action" % self._config.check_child_processes_action)
action_function(service_id)
def _respawn_action(self, service_id):
LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
{'service': service_id.service,
'uuid': service_id.uuid})
self._process_managers[service_id].enable()
def _exit_action(self, service_id):
LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
"actions"))
self._exit_handler(service_id.uuid, service_id.service)

View File

@ -0,0 +1,55 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
from neutron.agent.linux import daemon
def main():
class SimpleDaemon(daemon.Daemon):
"""The purpose of this daemon is to serve as an example, and also as
a dummy daemon, which can be invoked by functional testing, it
does nothing but setting the pid file, and staying detached in the
background.
"""
def run(self):
while True:
eventlet.sleep(10)
eventlet.monkey_patch()
opts = [
cfg.StrOpt('uuid',
help=_('uuid provided from the command line '
'so external_process can track us via /proc/'
'cmdline interface.'),
required=True),
cfg.StrOpt('pid_file',
help=_('Location of pid file of this process.'),
required=True)
]
cfg.CONF.register_cli_opts(opts)
# Don't get the default configuration file
cfg.CONF(project='neutron', default_config_files=[])
simple_daemon = SimpleDaemon(cfg.CONF.pid_file,
uuid=cfg.CONF.uuid)
simple_daemon.start()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,107 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
from six import moves
from neutron.agent.linux import external_process
from neutron.tests.functional.agent.linux import simple_daemon
from neutron.tests.functional import base
UUID_FORMAT = "test-uuid-%d"
class BaseTestProcessMonitor(base.BaseSudoTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self._exit_handler_called = False
cfg.CONF.set_override('check_child_processes', True)
cfg.CONF.set_override('check_child_processes_interval', 1)
self._child_processes = []
self._ext_processes = None
self.addCleanup(self.cleanup_spawned_children)
def create_child_processes_manager(self, action):
cfg.CONF.set_override('check_child_processes_action', action)
self._ext_processes = external_process.ProcessMonitor(
config=cfg.CONF,
root_helper=None,
resource_type='test',
exit_handler=self._exit_handler)
def _exit_handler(self, uuid, service):
self._exit_handler_called = True
self._exit_handler_params = (uuid, service)
def _make_cmdline_callback(self, uuid):
def _cmdline_callback(pidfile):
cmdline = ["python", simple_daemon.__file__,
"--uuid=%s" % uuid,
"--pid_file=%s" % pidfile]
return cmdline
return _cmdline_callback
def _spawn_n_children(self, n, service=None):
self._child_processes = []
for child_number in moves.xrange(n):
uuid = self._child_uuid(child_number)
_callback = self._make_cmdline_callback(uuid)
self._ext_processes.enable(uuid=uuid,
cmd_callback=_callback,
service=service)
pm = self._ext_processes.get_process_manager(uuid, service)
self._child_processes.append(pm)
@staticmethod
def _child_uuid(child_number):
return UUID_FORMAT % child_number
def _kill_last_child(self):
self._child_processes[-1].disable()
def spawn_child_processes_and_kill_last(self, service=None, number=2):
self._spawn_n_children(number, service)
self._kill_last_child()
self.assertFalse(self._child_processes[-1].active)
def wait_for_all_childs_respawned(self):
def all_childs_active():
return all(pm.active for pm in self._child_processes)
self._wait_for_condition(all_childs_active)
def _wait_for_condition(self, exit_condition, extra_time=5):
# we need to allow extra_time for the check process to happen
# and properly execute action over the gone processes under
# high load conditions
max_wait_time = cfg.CONF.check_child_processes_interval + extra_time
with self.assert_max_execution_time(max_wait_time):
while not exit_condition():
eventlet.sleep(0.01)
def cleanup_spawned_children(self):
if self._ext_processes:
self._ext_processes.disable_all()
class TestProcessMonitor(BaseTestProcessMonitor):
def test_respawn_handler(self):
self.create_child_processes_manager('respawn')
self.spawn_child_processes_and_kill_last()
self.wait_for_all_childs_respawned()

View File

@ -0,0 +1,99 @@
# Copyright 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from neutron.agent.linux import external_process
from neutron.tests import base
TEST_UUID = 'test-uuid'
TEST_SERVICE1 = 'testsvc'
TEST_PID = 1234
class BaseTestProcessMonitor(base.BaseTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self.pm_patch = mock.patch("neutron.agent.linux.external_process."
"ProcessManager", side_effect=mock.Mock)
self.pmanager = self.pm_patch.start()
self.log_patch = mock.patch("neutron.agent.linux.external_process."
"LOG.error")
self.error_log = self.log_patch.start()
self.spawn_patch = mock.patch("eventlet.spawn")
self.eventlent_spawn = self.spawn_patch.start()
# create a default process monitor
self.create_child_process_monitor('respawn')
def create_child_process_monitor(self, action):
self.exit_handler = mock.Mock()
conf = mock.Mock()
conf.check_child_processes_action = action
conf.check_child_processes = True
self.pmonitor = external_process.ProcessMonitor(
config=conf,
root_helper=None,
resource_type='test',
exit_handler=self.exit_handler)
def get_monitored_process_manager(self, uuid, service=None):
self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
return self.pmonitor.get_process_manager(uuid, service)
class TestProcessMonitor(BaseTestProcessMonitor):
def test_error_logged(self):
pm = self.get_monitored_process_manager(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.assertTrue(self.error_log.called)
def test_exit_handler(self):
self.create_child_process_monitor('exit')
pm = self.get_monitored_process_manager(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.exit_handler.assert_called_once_with(TEST_UUID, None)
def test_different_service_types(self):
pm_none = self.get_monitored_process_manager(TEST_UUID)
pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
self.assertNotEqual(pm_none, pm_svc1)
def test_active_method(self, service=None):
pm = self.get_monitored_process_manager(TEST_UUID, service)
pm.active = False
self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
pm.active = True
self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
def test_active_method_with_service(self):
self.test_active_method(TEST_SERVICE1)
def test_pid_method(self, service=None):
pm = self.get_monitored_process_manager(TEST_UUID, service)
pm.pid = TEST_PID
self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
def test_pid_method_with_service(self):
self.test_pid_method(TEST_PID)
def test_pid_method_unknown_uuid(self):
self.assertFalse(self.pmonitor.get_pid('bad-uuid'))

View File

@ -665,7 +665,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
mock.ANY,
'--debug',
('--log-file=neutron-ns-metadata-proxy-%s.log' %
fake_meta_network.id)])
fake_meta_network.id)], addl_env=None)
])
finally:
self.external_process_p.start()

View File

@ -2179,7 +2179,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
'--debug',
'--log-file=neutron-ns-metadata-proxy-%s.log' %
router_id
])
], addl_env=None)
])
finally:
self.external_process_p.start()

View File

@ -61,8 +61,8 @@ class TestProcessManager(base.BaseTestCase):
name.assert_called_once_with(ensure_pids_dir=True)
ip_lib.assert_has_calls([
mock.call.IPWrapper('sudo', 'ns'),
mock.call.IPWrapper().netns.execute(['the', 'cmd'])]
)
mock.call.IPWrapper().netns.execute(['the', 'cmd'],
addl_env=None)])
def test_enable_with_namespace_process_active(self):
callback = mock.Mock()