Add support for managing async processes
Interacting with a long-running asynchronous process requires the use of non-blocking io. This change adds a helper class that can launch a long-running process and read stdout and stderr in a non-blocking fashion via eventlet. This functionality is intended to support monitoring ovsdb via a long-running and root-privileged invocation of ovsdb-client. The complexity of the system interaction in this patch suggested the addition of a functional test that validated actual behaviour. The test was added under the neutron/tests/functional path which is now included in the testr search path. Partial-Bug: #1177973 Change-Id: I9969e556acecf7a9e77d873371cc2ec2647be011
This commit is contained in:
parent
077a880ce0
commit
991728400a
@ -1,4 +1,4 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION
|
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION
|
||||||
test_id_option=--load-list $IDFILE
|
test_id_option=--load-list $IDFILE
|
||||||
test_list_option=--list
|
test_list_option=--list
|
||||||
|
26
TESTING
26
TESTING
@ -8,12 +8,18 @@ Overview
|
|||||||
the various pieces of the neutron tree to make sure any new changes
|
the various pieces of the neutron tree to make sure any new changes
|
||||||
don't break existing functionality.
|
don't break existing functionality.
|
||||||
|
|
||||||
|
The functional tests are intended to validate actual system
|
||||||
|
interaction. Mocks should be used sparingly, if at all. Care
|
||||||
|
should be taken to ensure that existing system resources are not
|
||||||
|
modified and that resources created in tests are properly cleaned
|
||||||
|
up.
|
||||||
|
|
||||||
Running tests
|
Running tests
|
||||||
|
|
||||||
There are two mechanisms for running tests: run_tests.sh and tox.
|
There are two mechanisms for running tests: run_tests.sh and tox.
|
||||||
Before submitting a patch for review you should always ensure all unit
|
Before submitting a patch for review you should always ensure all
|
||||||
test pass; a tox run is triggered by the jenkins gate executed on gerrit
|
test pass; a tox run is triggered by the jenkins gate executed on
|
||||||
for each patch pushed for review.
|
gerrit for each patch pushed for review.
|
||||||
|
|
||||||
With both mechanisms you can either run the tests in the standard
|
With both mechanisms you can either run the tests in the standard
|
||||||
environment or create a virtual environment to run them in.
|
environment or create a virtual environment to run them in.
|
||||||
@ -41,18 +47,18 @@ Running individual tests
|
|||||||
Adding more tests
|
Adding more tests
|
||||||
|
|
||||||
Neutron has a fast growing code base and there is plenty of areas that
|
Neutron has a fast growing code base and there is plenty of areas that
|
||||||
need to be covered by unit tests.
|
need to be covered by unit and functional tests.
|
||||||
|
|
||||||
To get a grasp of the areas where unit tests are needed, you can check
|
To get a grasp of the areas where tests are needed, you can check
|
||||||
current coverage by running:
|
current coverage by running:
|
||||||
|
|
||||||
$ ./run_tests.sh -c
|
$ ./run_tests.sh -c
|
||||||
|
|
||||||
Development process
|
Development process
|
||||||
|
|
||||||
It is expected that any new changes that are proposed for merge come with
|
It is expected that any new changes that are proposed for merge
|
||||||
unit tests for that feature or code area. Ideally any bugs fixes that are
|
come with tests for that feature or code area. Ideally any bugs
|
||||||
submitted also have unit tests to prove that they stay fixed!
|
fixes that are submitted also have tests to prove that they stay
|
||||||
In addition, before proposing for merge, all of the current unit tests
|
fixed! In addition, before proposing for merge, all of the
|
||||||
should be passing.
|
current tests should be passing.
|
||||||
|
|
||||||
|
214
neutron/agent/linux/async_process.py
Normal file
214
neutron/agent/linux/async_process.py
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
import eventlet.event
|
||||||
|
import eventlet.queue
|
||||||
|
import eventlet.timeout
|
||||||
|
|
||||||
|
from neutron.agent.linux import utils
|
||||||
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncProcessException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncProcess(object):
|
||||||
|
"""Manages an asynchronous process.
|
||||||
|
|
||||||
|
This class spawns a new process via subprocess and uses
|
||||||
|
greenthreads to read stderr and stdout asynchronously into queues
|
||||||
|
that can be read via repeatedly calling iter_stdout() and
|
||||||
|
iter_stderr().
|
||||||
|
|
||||||
|
If respawn_interval is non-zero, any error in communicating with
|
||||||
|
the managed process will result in the process and greenthreads
|
||||||
|
being cleaned up and the process restarted after the specified
|
||||||
|
interval.
|
||||||
|
|
||||||
|
Example usage:
|
||||||
|
|
||||||
|
>>> import time
|
||||||
|
>>> proc = AsyncProcess(['ping'])
|
||||||
|
>>> proc.start()
|
||||||
|
>>> time.sleep(5)
|
||||||
|
>>> proc.stop()
|
||||||
|
>>> for line in proc.iter_stdout():
|
||||||
|
... print line
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cmd, root_helper=None, respawn_interval=None):
|
||||||
|
"""Constructor.
|
||||||
|
|
||||||
|
:param cmd: The list of command arguments to invoke.
|
||||||
|
:param root_helper: Optional, utility to use when running shell cmds.
|
||||||
|
:param respawn_interval: Optional, the interval in seconds to wait
|
||||||
|
to respawn after unexpected process death. Respawn will
|
||||||
|
only be attempted if a value of 0 or greater is provided.
|
||||||
|
"""
|
||||||
|
self.cmd = cmd
|
||||||
|
self.root_helper = root_helper
|
||||||
|
if respawn_interval is not None and respawn_interval < 0:
|
||||||
|
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
|
||||||
|
self.respawn_interval = respawn_interval
|
||||||
|
self._process = None
|
||||||
|
self._kill_event = None
|
||||||
|
self._stdout_lines = eventlet.queue.LightQueue()
|
||||||
|
self._stderr_lines = eventlet.queue.LightQueue()
|
||||||
|
self._watchers = []
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Launch a process and monitor it asynchronously."""
|
||||||
|
if self._kill_event:
|
||||||
|
raise AsyncProcessException(_('Process is already started'))
|
||||||
|
else:
|
||||||
|
LOG.debug(_('Launching async process [%s].'), self.cmd)
|
||||||
|
self._spawn()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Halt the process and watcher threads."""
|
||||||
|
if self._kill_event:
|
||||||
|
LOG.debug(_('Halting async process [%s].'), self.cmd)
|
||||||
|
self._kill()
|
||||||
|
else:
|
||||||
|
raise AsyncProcessException(_('Process is not running.'))
|
||||||
|
|
||||||
|
def _spawn(self):
|
||||||
|
"""Spawn a process and its watchers."""
|
||||||
|
self._kill_event = eventlet.event.Event()
|
||||||
|
self._process, cmd = utils.create_process(self.cmd,
|
||||||
|
root_helper=self.root_helper)
|
||||||
|
self._watchers = []
|
||||||
|
for reader in (self._read_stdout, self._read_stderr):
|
||||||
|
# Pass the stop event directly to the greenthread to
|
||||||
|
# ensure that assignment of a new event to the instance
|
||||||
|
# attribute does not prevent the greenthread from using
|
||||||
|
# the original event.
|
||||||
|
watcher = eventlet.spawn(self._watch_process,
|
||||||
|
reader,
|
||||||
|
self._kill_event)
|
||||||
|
self._watchers.append(watcher)
|
||||||
|
|
||||||
|
def _kill(self, respawning=False):
|
||||||
|
"""Kill the process and the associated watcher greenthreads.
|
||||||
|
|
||||||
|
:param respawning: Optional, whether respawn will be subsequently
|
||||||
|
attempted.
|
||||||
|
"""
|
||||||
|
# Halt the greenthreads
|
||||||
|
self._kill_event.send()
|
||||||
|
|
||||||
|
pid = self._get_pid_to_kill()
|
||||||
|
if pid:
|
||||||
|
self._kill_process(pid)
|
||||||
|
|
||||||
|
if not respawning:
|
||||||
|
# Clear the kill event to ensure the process can be
|
||||||
|
# explicitly started again.
|
||||||
|
self._kill_event = None
|
||||||
|
|
||||||
|
def _get_pid_to_kill(self):
|
||||||
|
pid = self._process.pid
|
||||||
|
# If root helper was used, two processes will be created:
|
||||||
|
#
|
||||||
|
# - a root helper process (e.g. sudo myscript)
|
||||||
|
# - a child process (e.g. myscript)
|
||||||
|
#
|
||||||
|
# Killing the root helper process will leave the child process
|
||||||
|
# as a zombie, so the only way to ensure that both die is to
|
||||||
|
# target the child process directly.
|
||||||
|
if self.root_helper:
|
||||||
|
pids = utils.find_child_pids(pid)
|
||||||
|
if pids:
|
||||||
|
# The root helper will only ever launch a single child.
|
||||||
|
pid = pids[0]
|
||||||
|
else:
|
||||||
|
# Process is already dead.
|
||||||
|
pid = None
|
||||||
|
return pid
|
||||||
|
|
||||||
|
def _kill_process(self, pid):
|
||||||
|
try:
|
||||||
|
# A process started by a root helper will be running as
|
||||||
|
# root and need to be killed via the same helper.
|
||||||
|
utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
|
||||||
|
except Exception as ex:
|
||||||
|
stale_pid = (isinstance(ex, RuntimeError) and
|
||||||
|
'No such process' in str(ex))
|
||||||
|
if not stale_pid:
|
||||||
|
LOG.exception(_('An error occurred while killing [%s].'),
|
||||||
|
self.cmd)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_process_error(self):
|
||||||
|
"""Kill the async process and respawn if necessary."""
|
||||||
|
LOG.debug(_('Halting async process [%s] in response to an error.'),
|
||||||
|
self.cmd)
|
||||||
|
respawning = self.respawn_interval >= 0
|
||||||
|
self._kill(respawning=respawning)
|
||||||
|
if respawning:
|
||||||
|
eventlet.sleep(self.respawn_interval)
|
||||||
|
LOG.debug(_('Respawning async process [%s].'), self.cmd)
|
||||||
|
self._spawn()
|
||||||
|
|
||||||
|
def _watch_process(self, callback, kill_event):
|
||||||
|
while not kill_event.ready():
|
||||||
|
try:
|
||||||
|
if not callback():
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_('An error occured while communicating '
|
||||||
|
'with async process [%s].'), self.cmd)
|
||||||
|
break
|
||||||
|
# Ensure that watching a process with lots of output does
|
||||||
|
# not block execution of other greenthreads.
|
||||||
|
eventlet.sleep()
|
||||||
|
# The kill event not being ready indicates that the loop was
|
||||||
|
# broken out of due to an error in the watched process rather
|
||||||
|
# than the loop condition being satisfied.
|
||||||
|
if not kill_event.ready():
|
||||||
|
self._handle_process_error()
|
||||||
|
|
||||||
|
def _read(self, stream, queue):
|
||||||
|
data = stream.readline()
|
||||||
|
if data:
|
||||||
|
data = data.strip()
|
||||||
|
queue.put(data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _read_stdout(self):
|
||||||
|
return self._read(self._process.stdout, self._stdout_lines)
|
||||||
|
|
||||||
|
def _read_stderr(self):
|
||||||
|
return self._read(self._process.stderr, self._stderr_lines)
|
||||||
|
|
||||||
|
def _iter_queue(self, queue):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
yield queue.get_nowait()
|
||||||
|
except eventlet.queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
def iter_stdout(self):
|
||||||
|
return self._iter_queue(self._stdout_lines)
|
||||||
|
|
||||||
|
def iter_stderr(self):
|
||||||
|
return self._iter_queue(self._stderr_lines)
|
@ -34,8 +34,12 @@ from neutron.openstack.common import log as logging
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def execute(cmd, root_helper=None, process_input=None, addl_env=None,
|
def create_process(cmd, root_helper=None, addl_env=None):
|
||||||
check_exit_code=True, return_stderr=False):
|
"""Create a process object for the given command.
|
||||||
|
|
||||||
|
The return value will be a tuple of the process object and the
|
||||||
|
list of command arguments used to create it.
|
||||||
|
"""
|
||||||
if root_helper:
|
if root_helper:
|
||||||
cmd = shlex.split(root_helper) + cmd
|
cmd = shlex.split(root_helper) + cmd
|
||||||
cmd = map(str, cmd)
|
cmd = map(str, cmd)
|
||||||
@ -44,12 +48,21 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
|
|||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
if addl_env:
|
if addl_env:
|
||||||
env.update(addl_env)
|
env.update(addl_env)
|
||||||
|
|
||||||
|
obj = utils.subprocess_popen(cmd, shell=False,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
env=env)
|
||||||
|
|
||||||
|
return obj, cmd
|
||||||
|
|
||||||
|
|
||||||
|
def execute(cmd, root_helper=None, process_input=None, addl_env=None,
|
||||||
|
check_exit_code=True, return_stderr=False):
|
||||||
try:
|
try:
|
||||||
obj = utils.subprocess_popen(cmd, shell=False,
|
obj, cmd = create_process(cmd, root_helper=root_helper,
|
||||||
stdin=subprocess.PIPE,
|
addl_env=addl_env)
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
env=env)
|
|
||||||
_stdout, _stderr = (process_input and
|
_stdout, _stderr = (process_input and
|
||||||
obj.communicate(process_input) or
|
obj.communicate(process_input) or
|
||||||
obj.communicate())
|
obj.communicate())
|
||||||
@ -95,3 +108,17 @@ def replace_file(file_name, data):
|
|||||||
tmp_file.close()
|
tmp_file.close()
|
||||||
os.chmod(tmp_file.name, 0o644)
|
os.chmod(tmp_file.name, 0o644)
|
||||||
os.rename(tmp_file.name, file_name)
|
os.rename(tmp_file.name, file_name)
|
||||||
|
|
||||||
|
|
||||||
|
def find_child_pids(pid):
|
||||||
|
"""Retrieve a list of the pids of child processes of the given pid."""
|
||||||
|
try:
|
||||||
|
raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='])
|
||||||
|
except RuntimeError as e:
|
||||||
|
# Exception has already been logged by execute
|
||||||
|
no_children_found = 'Exit code: 1' in str(e)
|
||||||
|
if no_children_found:
|
||||||
|
return []
|
||||||
|
# Unexpected errors are the responsibility of the caller
|
||||||
|
raise
|
||||||
|
return [x.strip() for x in raw_pids.split('\n') if x.strip()]
|
||||||
|
15
neutron/tests/functional/__init__.py
Normal file
15
neutron/tests/functional/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
15
neutron/tests/functional/agent/__init__.py
Normal file
15
neutron/tests/functional/agent/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
15
neutron/tests/functional/agent/linux/__init__.py
Normal file
15
neutron/tests/functional/agent/linux/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
79
neutron/tests/functional/agent/linux/test_async_process.py
Normal file
79
neutron/tests/functional/agent/linux/test_async_process.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
import eventlet.timeout
|
||||||
|
import fixtures
|
||||||
|
|
||||||
|
from neutron.agent.linux import async_process
|
||||||
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncProcess(base.BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAsyncProcess, self).setUp()
|
||||||
|
self.test_file_path = self.useFixture(
|
||||||
|
fixtures.TempDir()).join("test_async_process.tmp")
|
||||||
|
self.data = [str(x) for x in xrange(4)]
|
||||||
|
with file(self.test_file_path, 'w') as f:
|
||||||
|
f.writelines('%s\n' % item for item in self.data)
|
||||||
|
|
||||||
|
def _check_stdout(self, proc):
|
||||||
|
# Ensure that all the output from the file is read
|
||||||
|
output = []
|
||||||
|
while output != self.data:
|
||||||
|
new_output = list(proc.iter_stdout())
|
||||||
|
if new_output:
|
||||||
|
output += new_output
|
||||||
|
eventlet.sleep(0.01)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def assert_max_execution_time(self, max_execution_time=5):
|
||||||
|
with eventlet.timeout.Timeout(max_execution_time, False):
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
self.fail('Execution of this test timed out')
|
||||||
|
|
||||||
|
def test_stopping_async_process_lifecycle(self):
|
||||||
|
with self.assert_max_execution_time():
|
||||||
|
proc = async_process.AsyncProcess(['tail', '-f',
|
||||||
|
self.test_file_path])
|
||||||
|
proc.start()
|
||||||
|
self._check_stdout(proc)
|
||||||
|
proc.stop()
|
||||||
|
|
||||||
|
# Ensure that the process and greenthreads have stopped
|
||||||
|
proc._process.wait()
|
||||||
|
self.assertEqual(proc._process.returncode, -9)
|
||||||
|
for watcher in proc._watchers:
|
||||||
|
watcher.wait()
|
||||||
|
|
||||||
|
def test_async_process_respawns(self):
|
||||||
|
with self.assert_max_execution_time():
|
||||||
|
proc = async_process.AsyncProcess(['tail', '-f',
|
||||||
|
self.test_file_path],
|
||||||
|
respawn_interval=0)
|
||||||
|
proc.start()
|
||||||
|
|
||||||
|
# Ensure that the same output is read twice
|
||||||
|
self._check_stdout(proc)
|
||||||
|
pid = proc._get_pid_to_kill()
|
||||||
|
proc._kill_process(pid)
|
||||||
|
self._check_stdout(proc)
|
||||||
|
proc.stop()
|
15
neutron/tests/unit/agent/__init__.py
Normal file
15
neutron/tests/unit/agent/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
15
neutron/tests/unit/agent/linux/__init__.py
Normal file
15
neutron/tests/unit/agent/linux/__init__.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
239
neutron/tests/unit/agent/linux/test_async_process.py
Normal file
239
neutron/tests/unit/agent/linux/test_async_process.py
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet.event
|
||||||
|
import eventlet.queue
|
||||||
|
import eventlet.timeout
|
||||||
|
import mock
|
||||||
|
import testtools
|
||||||
|
|
||||||
|
from neutron.agent.linux import async_process
|
||||||
|
from neutron.agent.linux import utils
|
||||||
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
_marker = ()
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncProcess(base.BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAsyncProcess, self).setUp()
|
||||||
|
self.proc = async_process.AsyncProcess(['fake'])
|
||||||
|
|
||||||
|
def test_construtor_raises_exception_for_negative_respawn_interval(self):
|
||||||
|
with testtools.ExpectedException(ValueError):
|
||||||
|
async_process.AsyncProcess(['fake'], respawn_interval=-1)
|
||||||
|
|
||||||
|
def test__spawn(self):
|
||||||
|
expected_process = 'Foo'
|
||||||
|
proc = self.proc
|
||||||
|
with mock.patch.object(utils, 'create_process') as mock_create_process:
|
||||||
|
mock_create_process.return_value = [expected_process, None]
|
||||||
|
with mock.patch('eventlet.spawn') as mock_spawn:
|
||||||
|
proc._spawn()
|
||||||
|
|
||||||
|
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
|
||||||
|
self.assertEqual(proc._process, expected_process)
|
||||||
|
mock_spawn.assert_has_calls([
|
||||||
|
mock.call(proc._watch_process,
|
||||||
|
proc._read_stdout,
|
||||||
|
proc._kill_event),
|
||||||
|
mock.call(proc._watch_process,
|
||||||
|
proc._read_stderr,
|
||||||
|
proc._kill_event),
|
||||||
|
])
|
||||||
|
self.assertEqual(len(proc._watchers), 2)
|
||||||
|
|
||||||
|
def test__handle_process_error_kills_with_respawn(self):
|
||||||
|
with mock.patch.object(self.proc, '_kill') as kill:
|
||||||
|
self.proc._handle_process_error()
|
||||||
|
|
||||||
|
kill.assert_has_calls(mock.call(respawning=False))
|
||||||
|
|
||||||
|
def test__handle_process_error_kills_without_respawn(self):
|
||||||
|
self.proc.respawn_interval = 1
|
||||||
|
with mock.patch.object(self.proc, '_kill') as kill:
|
||||||
|
with mock.patch.object(self.proc, '_spawn') as spawn:
|
||||||
|
with mock.patch('eventlet.sleep') as sleep:
|
||||||
|
self.proc._handle_process_error()
|
||||||
|
|
||||||
|
kill.assert_has_calls(mock.call(respawning=True))
|
||||||
|
sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
|
||||||
|
spawn.assert_called_once()
|
||||||
|
|
||||||
|
def _test__watch_process(self, callback, kill_event):
|
||||||
|
self.proc._kill_event = kill_event
|
||||||
|
# Ensure the test times out eventually if the watcher loops endlessly
|
||||||
|
with eventlet.timeout.Timeout(5):
|
||||||
|
with mock.patch.object(self.proc,
|
||||||
|
'_handle_process_error') as func:
|
||||||
|
self.proc._watch_process(callback, kill_event)
|
||||||
|
|
||||||
|
if not kill_event.ready():
|
||||||
|
func.assert_called_once()
|
||||||
|
|
||||||
|
def test__watch_process_exits_on_callback_failure(self):
|
||||||
|
self._test__watch_process(lambda: False, eventlet.event.Event())
|
||||||
|
|
||||||
|
def test__watch_process_exits_on_exception(self):
|
||||||
|
def foo():
|
||||||
|
raise Exception('Error!')
|
||||||
|
self._test__watch_process(foo, eventlet.event.Event())
|
||||||
|
|
||||||
|
def test__watch_process_exits_on_sent_kill_event(self):
|
||||||
|
kill_event = eventlet.event.Event()
|
||||||
|
kill_event.send()
|
||||||
|
self._test__watch_process(None, kill_event)
|
||||||
|
|
||||||
|
def _test_read_output_queues_and_returns_result(self, output):
|
||||||
|
queue = eventlet.queue.LightQueue()
|
||||||
|
mock_stream = mock.Mock()
|
||||||
|
with mock.patch.object(mock_stream, 'readline') as mock_readline:
|
||||||
|
mock_readline.return_value = output
|
||||||
|
result = self.proc._read(mock_stream, queue)
|
||||||
|
|
||||||
|
if output:
|
||||||
|
self.assertEqual(output, result)
|
||||||
|
self.assertEqual(output, queue.get_nowait())
|
||||||
|
else:
|
||||||
|
self.assertFalse(result)
|
||||||
|
self.assertTrue(queue.empty())
|
||||||
|
|
||||||
|
def test__read_queues_and_returns_output(self):
|
||||||
|
self._test_read_output_queues_and_returns_result('foo')
|
||||||
|
|
||||||
|
def test__read_returns_none_for_missing_output(self):
|
||||||
|
self._test_read_output_queues_and_returns_result('')
|
||||||
|
|
||||||
|
def test_start_raises_exception_if_process_already_started(self):
|
||||||
|
self.proc._kill_event = True
|
||||||
|
with testtools.ExpectedException(async_process.AsyncProcessException):
|
||||||
|
self.proc.start()
|
||||||
|
|
||||||
|
def test_start_invokes__spawn(self):
|
||||||
|
with mock.patch.object(self.proc, '_spawn') as mock_start:
|
||||||
|
self.proc.start()
|
||||||
|
|
||||||
|
mock_start.assert_called_once()
|
||||||
|
|
||||||
|
def test__iter_queue_returns_empty_list_for_empty_queue(self):
|
||||||
|
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test__iter_queue_returns_queued_data(self):
|
||||||
|
queue = eventlet.queue.LightQueue()
|
||||||
|
queue.put('foo')
|
||||||
|
result = list(self.proc._iter_queue(queue))
|
||||||
|
self.assertEqual(result, ['foo'])
|
||||||
|
|
||||||
|
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
|
||||||
|
expected_value = 'foo'
|
||||||
|
with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue:
|
||||||
|
mock_iter_queue.return_value = expected_value
|
||||||
|
target_func = getattr(self.proc, 'iter_%s' % output_type, None)
|
||||||
|
value = target_func()
|
||||||
|
|
||||||
|
self.assertEqual(value, expected_value)
|
||||||
|
queue = getattr(self.proc, '_%s_lines' % output_type, None)
|
||||||
|
mock_iter_queue.assert_called_with(queue)
|
||||||
|
|
||||||
|
def test_iter_stdout(self):
|
||||||
|
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
|
||||||
|
|
||||||
|
def test_iter_stderr(self):
|
||||||
|
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
|
||||||
|
|
||||||
|
def _test__kill(self, respawning, pid=None):
|
||||||
|
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event:
|
||||||
|
with mock.patch.object(self.proc, '_get_pid_to_kill',
|
||||||
|
return_value=pid):
|
||||||
|
with mock.patch.object(self.proc,
|
||||||
|
'_kill_process') as mock_kill_process:
|
||||||
|
self.proc._kill(respawning)
|
||||||
|
|
||||||
|
if respawning:
|
||||||
|
self.assertIsNotNone(self.proc._kill_event)
|
||||||
|
else:
|
||||||
|
self.assertIsNone(self.proc._kill_event)
|
||||||
|
|
||||||
|
mock_kill_event.send.assert_called_once()
|
||||||
|
if pid:
|
||||||
|
mock_kill_process.assert_called_once(pid)
|
||||||
|
|
||||||
|
def test__kill_when_respawning_does_not_clear_kill_event(self):
|
||||||
|
self._test__kill(True)
|
||||||
|
|
||||||
|
def test__kill_when_not_respawning_clears_kill_event(self):
|
||||||
|
self._test__kill(False)
|
||||||
|
|
||||||
|
def test__kill_targets_process_for_pid(self):
|
||||||
|
self._test__kill(False, pid='1')
|
||||||
|
|
||||||
|
def _test__get_pid_to_kill(self, expected=_marker,
|
||||||
|
root_helper=None, pids=None):
|
||||||
|
if root_helper:
|
||||||
|
self.proc.root_helper = root_helper
|
||||||
|
with mock.patch.object(self.proc, '_process') as mock_process:
|
||||||
|
with mock.patch.object(mock_process, 'pid') as mock_pid:
|
||||||
|
with mock.patch.object(utils, 'find_child_pids',
|
||||||
|
return_value=pids):
|
||||||
|
actual = self.proc._get_pid_to_kill()
|
||||||
|
if expected is _marker:
|
||||||
|
expected = mock_pid
|
||||||
|
self.assertEqual(expected, actual)
|
||||||
|
|
||||||
|
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self):
|
||||||
|
self._test__get_pid_to_kill()
|
||||||
|
|
||||||
|
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self):
|
||||||
|
self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a')
|
||||||
|
|
||||||
|
def test__get_pid_to_kill_returns_none_with_root_helper(self):
|
||||||
|
self._test__get_pid_to_kill(expected=None, root_helper='a')
|
||||||
|
|
||||||
|
def _test__kill_process(self, pid, expected, exception_message=None):
|
||||||
|
self.proc.root_helper = 'foo'
|
||||||
|
if exception_message:
|
||||||
|
exc = RuntimeError(exception_message)
|
||||||
|
else:
|
||||||
|
exc = None
|
||||||
|
with mock.patch.object(utils, 'execute',
|
||||||
|
side_effect=exc) as mock_execute:
|
||||||
|
actual = self.proc._kill_process(pid)
|
||||||
|
|
||||||
|
self.assertEqual(expected, actual)
|
||||||
|
mock_execute.assert_called_with(['kill', '-9', pid],
|
||||||
|
root_helper=self.proc.root_helper)
|
||||||
|
|
||||||
|
def test__kill_process_returns_true_for_valid_pid(self):
|
||||||
|
self._test__kill_process('1', True)
|
||||||
|
|
||||||
|
def test__kill_process_returns_true_for_stale_pid(self):
|
||||||
|
self._test__kill_process('1', True, 'No such process')
|
||||||
|
|
||||||
|
def test__kill_process_returns_false_for_execute_exception(self):
|
||||||
|
self._test__kill_process('1', False, 'Invalid')
|
||||||
|
|
||||||
|
def test_stop_calls_kill(self):
|
||||||
|
self.proc._kill_event = True
|
||||||
|
with mock.patch.object(self.proc, '_kill') as mock_kill:
|
||||||
|
self.proc.stop()
|
||||||
|
mock_kill.called_once()
|
||||||
|
|
||||||
|
def test_stop_raises_exception_if_already_started(self):
|
||||||
|
with testtools.ExpectedException(async_process.AsyncProcessException):
|
||||||
|
self.proc.stop()
|
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
import mock
|
import mock
|
||||||
|
import testtools
|
||||||
|
|
||||||
from neutron.agent.linux import utils
|
from neutron.agent.linux import utils
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
@ -106,3 +107,25 @@ class AgentUtilsReplaceFile(base.BaseTestCase):
|
|||||||
ntf.assert_has_calls(expected)
|
ntf.assert_has_calls(expected)
|
||||||
chmod.assert_called_once_with('/baz', 0o644)
|
chmod.assert_called_once_with('/baz', 0o644)
|
||||||
rename.assert_called_once_with('/baz', '/foo')
|
rename.assert_called_once_with('/baz', '/foo')
|
||||||
|
|
||||||
|
|
||||||
|
class TestFindChildPids(base.BaseTestCase):
|
||||||
|
|
||||||
|
def test_returns_empty_list_for_exit_code_1(self):
|
||||||
|
with mock.patch.object(utils, 'execute',
|
||||||
|
side_effect=RuntimeError('Exit code: 1')):
|
||||||
|
self.assertEqual(utils.find_child_pids(-1), [])
|
||||||
|
|
||||||
|
def test_returns_empty_list_for_no_output(self):
|
||||||
|
with mock.patch.object(utils, 'execute', return_value=''):
|
||||||
|
self.assertEqual(utils.find_child_pids(-1), [])
|
||||||
|
|
||||||
|
def test_returns_list_of_child_process_ids_for_good_ouput(self):
|
||||||
|
with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):
|
||||||
|
self.assertEqual(utils.find_child_pids(-1), ['123', '185'])
|
||||||
|
|
||||||
|
def test_raises_unknown_exception(self):
|
||||||
|
with testtools.ExpectedException(RuntimeError):
|
||||||
|
with mock.patch.object(utils, 'execute',
|
||||||
|
side_effect=RuntimeError()):
|
||||||
|
utils.find_child_pids(-1)
|
||||||
|
Loading…
Reference in New Issue
Block a user