vmware-nsx/neutron/tests/unit/agent/linux/test_async_process.py
Maru Newby 991728400a Add support for managing async processes
Interacting with a long-running asynchronous process requires the
use of non-blocking io.  This change adds a helper class that can
launch a long-running process and read stdout and stderr in a
non-blocking fashion via eventlet.

This functionality is intended to support monitoring ovsdb via
a long-running and root-privileged invocation of ovsdb-client.

The complexity of the system interaction in this patch suggested
the addition of a functional test that validated actual behaviour.
The test was added under the neutron/tests/functional path which
is now included in the testr search path.

Partial-Bug: #1177973

Change-Id: I9969e556acecf7a9e77d873371cc2ec2647be011
2013-10-14 07:20:02 +00:00

240 lines
9.2 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet.event
import eventlet.queue
import eventlet.timeout
import mock
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
_marker = ()
class TestAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
self.proc = async_process.AsyncProcess(['fake'])
def test_construtor_raises_exception_for_negative_respawn_interval(self):
with testtools.ExpectedException(ValueError):
async_process.AsyncProcess(['fake'], respawn_interval=-1)
def test__spawn(self):
expected_process = 'Foo'
proc = self.proc
with mock.patch.object(utils, 'create_process') as mock_create_process:
mock_create_process.return_value = [expected_process, None]
with mock.patch('eventlet.spawn') as mock_spawn:
proc._spawn()
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
self.assertEqual(proc._process, expected_process)
mock_spawn.assert_has_calls([
mock.call(proc._watch_process,
proc._read_stdout,
proc._kill_event),
mock.call(proc._watch_process,
proc._read_stderr,
proc._kill_event),
])
self.assertEqual(len(proc._watchers), 2)
def test__handle_process_error_kills_with_respawn(self):
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=False))
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
with mock.patch.object(self.proc, '_kill') as kill:
with mock.patch.object(self.proc, '_spawn') as spawn:
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
kill.assert_has_calls(mock.call(respawning=True))
sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
spawn.assert_called_once()
def _test__watch_process(self, callback, kill_event):
self.proc._kill_event = kill_event
# Ensure the test times out eventually if the watcher loops endlessly
with eventlet.timeout.Timeout(5):
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(callback, kill_event)
if not kill_event.ready():
func.assert_called_once()
def test__watch_process_exits_on_callback_failure(self):
self._test__watch_process(lambda: False, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
raise Exception('Error!')
self._test__watch_process(foo, eventlet.event.Event())
def test__watch_process_exits_on_sent_kill_event(self):
kill_event = eventlet.event.Event()
kill_event.send()
self._test__watch_process(None, kill_event)
def _test_read_output_queues_and_returns_result(self, output):
queue = eventlet.queue.LightQueue()
mock_stream = mock.Mock()
with mock.patch.object(mock_stream, 'readline') as mock_readline:
mock_readline.return_value = output
result = self.proc._read(mock_stream, queue)
if output:
self.assertEqual(output, result)
self.assertEqual(output, queue.get_nowait())
else:
self.assertFalse(result)
self.assertTrue(queue.empty())
def test__read_queues_and_returns_output(self):
self._test_read_output_queues_and_returns_result('foo')
def test__read_returns_none_for_missing_output(self):
self._test_read_output_queues_and_returns_result('')
def test_start_raises_exception_if_process_already_started(self):
self.proc._kill_event = True
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.start()
def test_start_invokes__spawn(self):
with mock.patch.object(self.proc, '_spawn') as mock_start:
self.proc.start()
mock_start.assert_called_once()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
result = list(self.proc._iter_queue(queue))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
expected_value = 'foo'
with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
mock_iter_queue.return_value = expected_value
target_func = getattr(self.proc, 'iter_%s' % output_type, None)
value = target_func()
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
mock_iter_queue.assert_called_with(queue)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
def test_iter_stderr(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
with mock.patch.object(self.proc, '_get_pid_to_kill',
return_value=pid):
with mock.patch.object(self.proc,
'_kill_process') as mock_kill_process:
self.proc._kill(respawning)
if respawning:
self.assertIsNotNone(self.proc._kill_event)
else:
self.assertIsNone(self.proc._kill_event)
mock_kill_event.send.assert_called_once()
if pid:
mock_kill_process.assert_called_once(pid)
def test__kill_when_respawning_does_not_clear_kill_event(self):
self._test__kill(True)
def test__kill_when_not_respawning_clears_kill_event(self):
self._test__kill(False)
def test__kill_targets_process_for_pid(self):
self._test__kill(False, pid='1')
def _test__get_pid_to_kill(self, expected=_marker,
root_helper=None, pids=None):
if root_helper:
self.proc.root_helper = root_helper
with mock.patch.object(self.proc, '_process') as mock_process:
with mock.patch.object(mock_process, 'pid') as mock_pid:
with mock.patch.object(utils, 'find_child_pids',
return_value=pids):
actual = self.proc._get_pid_to_kill()
if expected is _marker:
expected = mock_pid
self.assertEqual(expected, actual)
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
self._test__get_pid_to_kill()
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a')
def test__get_pid_to_kill_returns_none_with_root_helper(self):
self._test__get_pid_to_kill(expected=None, root_helper='a')
def _test__kill_process(self, pid, expected, exception_message=None):
self.proc.root_helper = 'foo'
if exception_message:
exc = RuntimeError(exception_message)
else:
exc = None
with mock.patch.object(utils, 'execute',
side_effect=exc) as mock_execute:
actual = self.proc._kill_process(pid)
self.assertEqual(expected, actual)
mock_execute.assert_called_with(['kill', '-9', pid],
root_helper=self.proc.root_helper)
def test__kill_process_returns_true_for_valid_pid(self):
self._test__kill_process('1', True)
def test__kill_process_returns_true_for_stale_pid(self):
self._test__kill_process('1', True, 'No such process')
def test__kill_process_returns_false_for_execute_exception(self):
self._test__kill_process('1', False, 'Invalid')
def test_stop_calls_kill(self):
self.proc._kill_event = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop()
mock_kill.called_once()
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.stop()