Merge "Add support for managing async processes"

This commit is contained in:
Jenkins 2013-10-14 22:18:49 +00:00 committed by Gerrit Code Review
commit 110cae6543
12 changed files with 681 additions and 18 deletions

View File

@ -1,4 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

26
TESTING
View File

@ -8,12 +8,18 @@ Overview
the various pieces of the neutron tree to make sure any new changes
don't break existing functionality.
The functional tests are intended to validate actual system
interaction. Mocks should be used sparingly, if at all. Care
should be taken to ensure that existing system resources are not
modified and that resources created in tests are properly cleaned
up.
Running tests
There are two mechanisms for running tests: run_tests.sh and tox.
Before submitting a patch for review you should always ensure all unit
test pass; a tox run is triggered by the jenkins gate executed on gerrit
for each patch pushed for review.
Before submitting a patch for review you should always ensure all
test pass; a tox run is triggered by the jenkins gate executed on
gerrit for each patch pushed for review.
With both mechanisms you can either run the tests in the standard
environment or create a virtual environment to run them in.
@ -41,18 +47,18 @@ Running individual tests
Adding more tests
Neutron has a fast growing code base and there is plenty of areas that
need to be covered by unit tests.
need to be covered by unit and functional tests.
To get a grasp of the areas where unit tests are needed, you can check
To get a grasp of the areas where tests are needed, you can check
current coverage by running:
$ ./run_tests.sh -c
Development process
It is expected that any new changes that are proposed for merge come with
unit tests for that feature or code area. Ideally any bugs fixes that are
submitted also have unit tests to prove that they stay fixed!
In addition, before proposing for merge, all of the current unit tests
should be passing.
It is expected that any new changes that are proposed for merge
come with tests for that feature or code area. Ideally any bugs
fixes that are submitted also have tests to prove that they stay
fixed! In addition, before proposing for merge, all of the
current tests should be passing.

View File

@ -0,0 +1,214 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import eventlet.event
import eventlet.queue
import eventlet.timeout
from neutron.agent.linux import utils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class AsyncProcessException(Exception):
pass
class AsyncProcess(object):
"""Manages an asynchronous process.
This class spawns a new process via subprocess and uses
greenthreads to read stderr and stdout asynchronously into queues
that can be read via repeatedly calling iter_stdout() and
iter_stderr().
If respawn_interval is non-zero, any error in communicating with
the managed process will result in the process and greenthreads
being cleaned up and the process restarted after the specified
interval.
Example usage:
>>> import time
>>> proc = AsyncProcess(['ping'])
>>> proc.start()
>>> time.sleep(5)
>>> proc.stop()
>>> for line in proc.iter_stdout():
... print line
"""
def __init__(self, cmd, root_helper=None, respawn_interval=None):
"""Constructor.
:param cmd: The list of command arguments to invoke.
:param root_helper: Optional, utility to use when running shell cmds.
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
"""
self.cmd = cmd
self.root_helper = root_helper
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
self._kill_event = None
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
self._watchers = []
def start(self):
"""Launch a process and monitor it asynchronously."""
if self._kill_event:
raise AsyncProcessException(_('Process is already started'))
else:
LOG.debug(_('Launching async process [%s].'), self.cmd)
self._spawn()
def stop(self):
"""Halt the process and watcher threads."""
if self._kill_event:
LOG.debug(_('Halting async process [%s].'), self.cmd)
self._kill()
else:
raise AsyncProcessException(_('Process is not running.'))
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self.cmd,
root_helper=self.root_helper)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
# Pass the stop event directly to the greenthread to
# ensure that assignment of a new event to the instance
# attribute does not prevent the greenthread from using
# the original event.
watcher = eventlet.spawn(self._watch_process,
reader,
self._kill_event)
self._watchers.append(watcher)
def _kill(self, respawning=False):
"""Kill the process and the associated watcher greenthreads.
:param respawning: Optional, whether respawn will be subsequently
attempted.
"""
# Halt the greenthreads
self._kill_event.send()
pid = self._get_pid_to_kill()
if pid:
self._kill_process(pid)
if not respawning:
# Clear the kill event to ensure the process can be
# explicitly started again.
self._kill_event = None
def _get_pid_to_kill(self):
pid = self._process.pid
# If root helper was used, two processes will be created:
#
# - a root helper process (e.g. sudo myscript)
# - a child process (e.g. myscript)
#
# Killing the root helper process will leave the child process
# as a zombie, so the only way to ensure that both die is to
# target the child process directly.
if self.root_helper:
pids = utils.find_child_pids(pid)
if pids:
# The root helper will only ever launch a single child.
pid = pids[0]
else:
# Process is already dead.
pid = None
return pid
def _kill_process(self, pid):
try:
# A process started by a root helper will be running as
# root and need to be killed via the same helper.
utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
except Exception as ex:
stale_pid = (isinstance(ex, RuntimeError) and
'No such process' in str(ex))
if not stale_pid:
LOG.exception(_('An error occurred while killing [%s].'),
self.cmd)
return False
return True
def _handle_process_error(self):
"""Kill the async process and respawn if necessary."""
LOG.debug(_('Halting async process [%s] in response to an error.'),
self.cmd)
respawning = self.respawn_interval >= 0
self._kill(respawning=respawning)
if respawning:
eventlet.sleep(self.respawn_interval)
LOG.debug(_('Respawning async process [%s].'), self.cmd)
self._spawn()
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
try:
if not callback():
break
except Exception:
LOG.exception(_('An error occured while communicating '
'with async process [%s].'), self.cmd)
break
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
# The kill event not being ready indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
if not kill_event.ready():
self._handle_process_error()
def _read(self, stream, queue):
data = stream.readline()
if data:
data = data.strip()
queue.put(data)
return data
def _read_stdout(self):
return self._read(self._process.stdout, self._stdout_lines)
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
def _iter_queue(self, queue):
while True:
try:
yield queue.get_nowait()
except eventlet.queue.Empty:
break
def iter_stdout(self):
return self._iter_queue(self._stdout_lines)
def iter_stderr(self):
return self._iter_queue(self._stderr_lines)

View File

@ -34,8 +34,12 @@ from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def execute(cmd, root_helper=None, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False):
def create_process(cmd, root_helper=None, addl_env=None):
"""Create a process object for the given command.
The return value will be a tuple of the process object and the
list of command arguments used to create it.
"""
if root_helper:
cmd = shlex.split(root_helper) + cmd
cmd = map(str, cmd)
@ -44,12 +48,21 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
env = os.environ.copy()
if addl_env:
env.update(addl_env)
try:
obj = utils.subprocess_popen(cmd, shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env)
return obj, cmd
def execute(cmd, root_helper=None, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False):
try:
obj, cmd = create_process(cmd, root_helper=root_helper,
addl_env=addl_env)
_stdout, _stderr = (process_input and
obj.communicate(process_input) or
obj.communicate())
@ -95,3 +108,17 @@ def replace_file(file_name, data):
tmp_file.close()
os.chmod(tmp_file.name, 0o644)
os.rename(tmp_file.name, file_name)
def find_child_pids(pid):
"""Retrieve a list of the pids of child processes of the given pid."""
try:
raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='])
except RuntimeError as e:
# Exception has already been logged by execute
no_children_found = 'Exit code: 1' in str(e)
if no_children_found:
return []
# Unexpected errors are the responsibility of the caller
raise
return [x.strip() for x in raw_pids.split('\n') if x.strip()]

View File

@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet
import eventlet.timeout
import fixtures
from neutron.agent.linux import async_process
from neutron.tests import base
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
self.test_file_path = self.useFixture(
fixtures.TempDir()).join("test_async_process.tmp")
self.data = [str(x) for x in xrange(4)]
with file(self.test_file_path, 'w') as f:
f.writelines('%s\n' % item for item in self.data)
def _check_stdout(self, proc):
# Ensure that all the output from the file is read
output = []
while output != self.data:
new_output = list(proc.iter_stdout())
if new_output:
output += new_output
eventlet.sleep(0.01)
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.timeout.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def test_stopping_async_process_lifecycle(self):
with self.assert_max_execution_time():
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
proc.start()
self._check_stdout(proc)
proc.stop()
# Ensure that the process and greenthreads have stopped
proc._process.wait()
self.assertEqual(proc._process.returncode, -9)
for watcher in proc._watchers:
watcher.wait()
def test_async_process_respawns(self):
with self.assert_max_execution_time():
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path],
respawn_interval=0)
proc.start()
# Ensure that the same output is read twice
self._check_stdout(proc)
pid = proc._get_pid_to_kill()
proc._kill_process(pid)
self._check_stdout(proc)
proc.stop()

View File

@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,239 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet.event
import eventlet.queue
import eventlet.timeout
import mock
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
_marker = ()
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
self.proc = async_process.AsyncProcess(['fake'])
def test_construtor_raises_exception_for_negative_respawn_interval(self):
with testtools.ExpectedException(ValueError):
async_process.AsyncProcess(['fake'], respawn_interval=-1)
def test__spawn(self):
expected_process = 'Foo'
proc = self.proc
with mock.patch.object(utils, 'create_process') as mock_create_process:
mock_create_process.return_value = [expected_process, None]
with mock.patch('eventlet.spawn') as mock_spawn:
proc._spawn()
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
self.assertEqual(proc._process, expected_process)
mock_spawn.assert_has_calls([
mock.call(proc._watch_process,
proc._read_stdout,
proc._kill_event),
mock.call(proc._watch_process,
proc._read_stderr,
proc._kill_event),
])
self.assertEqual(len(proc._watchers), 2)
def test__handle_process_error_kills_with_respawn(self):
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=False))
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
with mock.patch.object(self.proc, '_kill') as kill:
with mock.patch.object(self.proc, '_spawn') as spawn:
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=True))
sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
spawn.assert_called_once()
def _test__watch_process(self, callback, kill_event):
self.proc._kill_event = kill_event
# Ensure the test times out eventually if the watcher loops endlessly
with eventlet.timeout.Timeout(5):
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(callback, kill_event)
if not kill_event.ready():
func.assert_called_once()
def test__watch_process_exits_on_callback_failure(self):
self._test__watch_process(lambda: False, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
raise Exception('Error!')
self._test__watch_process(foo, eventlet.event.Event())
def test__watch_process_exits_on_sent_kill_event(self):
kill_event = eventlet.event.Event()
kill_event.send()
self._test__watch_process(None, kill_event)
def _test_read_output_queues_and_returns_result(self, output):
queue = eventlet.queue.LightQueue()
mock_stream = mock.Mock()
with mock.patch.object(mock_stream, 'readline') as mock_readline:
mock_readline.return_value = output
result = self.proc._read(mock_stream, queue)
if output:
self.assertEqual(output, result)
self.assertEqual(output, queue.get_nowait())
else:
self.assertFalse(result)
self.assertTrue(queue.empty())
def test__read_queues_and_returns_output(self):
self._test_read_output_queues_and_returns_result('foo')
def test__read_returns_none_for_missing_output(self):
self._test_read_output_queues_and_returns_result('')
def test_start_raises_exception_if_process_already_started(self):
self.proc._kill_event = True
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.start()
def test_start_invokes__spawn(self):
with mock.patch.object(self.proc, '_spawn') as mock_start:
self.proc.start()
mock_start.assert_called_once()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
result = list(self.proc._iter_queue(queue))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
expected_value = 'foo'
with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
mock_iter_queue.return_value = expected_value
target_func = getattr(self.proc, 'iter_%s' % output_type, None)
value = target_func()
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
mock_iter_queue.assert_called_with(queue)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
def test_iter_stderr(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
with mock.patch.object(self.proc, '_get_pid_to_kill',
return_value=pid):
with mock.patch.object(self.proc,
'_kill_process') as mock_kill_process:
self.proc._kill(respawning)
if respawning:
self.assertIsNotNone(self.proc._kill_event)
else:
self.assertIsNone(self.proc._kill_event)
mock_kill_event.send.assert_called_once()
if pid:
mock_kill_process.assert_called_once(pid)
def test__kill_when_respawning_does_not_clear_kill_event(self):
self._test__kill(True)
def test__kill_when_not_respawning_clears_kill_event(self):
self._test__kill(False)
def test__kill_targets_process_for_pid(self):
self._test__kill(False, pid='1')
def _test__get_pid_to_kill(self, expected=_marker,
root_helper=None, pids=None):
if root_helper:
self.proc.root_helper = root_helper
with mock.patch.object(self.proc, '_process') as mock_process:
with mock.patch.object(mock_process, 'pid') as mock_pid:
with mock.patch.object(utils, 'find_child_pids',
return_value=pids):
actual = self.proc._get_pid_to_kill()
if expected is _marker:
expected = mock_pid
self.assertEqual(expected, actual)
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
self._test__get_pid_to_kill()
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a')
def test__get_pid_to_kill_returns_none_with_root_helper(self):
self._test__get_pid_to_kill(expected=None, root_helper='a')
def _test__kill_process(self, pid, expected, exception_message=None):
self.proc.root_helper = 'foo'
if exception_message:
exc = RuntimeError(exception_message)
else:
exc = None
with mock.patch.object(utils, 'execute',
side_effect=exc) as mock_execute:
actual = self.proc._kill_process(pid)
self.assertEqual(expected, actual)
mock_execute.assert_called_with(['kill', '-9', pid],
root_helper=self.proc.root_helper)
def test__kill_process_returns_true_for_valid_pid(self):
self._test__kill_process('1', True)
def test__kill_process_returns_true_for_stale_pid(self):
self._test__kill_process('1', True, 'No such process')
def test__kill_process_returns_false_for_execute_exception(self):
self._test__kill_process('1', False, 'Invalid')
def test_stop_calls_kill(self):
self.proc._kill_event = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop()
mock_kill.called_once()
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.stop()

View File

@ -17,6 +17,7 @@
import fixtures
import mock
import testtools
from neutron.agent.linux import utils
from neutron.tests import base
@ -106,3 +107,25 @@ class AgentUtilsReplaceFile(base.BaseTestCase):
ntf.assert_has_calls(expected)
chmod.assert_called_once_with('/baz', 0o644)
rename.assert_called_once_with('/baz', '/foo')
class TestFindChildPids(base.BaseTestCase):
def test_returns_empty_list_for_exit_code_1(self):
with mock.patch.object(utils, 'execute',
side_effect=RuntimeError('Exit code: 1')):
self.assertEqual(utils.find_child_pids(-1), [])
def test_returns_empty_list_for_no_output(self):
with mock.patch.object(utils, 'execute', return_value=''):
self.assertEqual(utils.find_child_pids(-1), [])
def test_returns_list_of_child_process_ids_for_good_ouput(self):
with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
def test_raises_unknown_exception(self):
with testtools.ExpectedException(RuntimeError):
with mock.patch.object(utils, 'execute',
side_effect=RuntimeError()):
utils.find_child_pids(-1)