fa546dd022
Change-Id: I41286e0b8f74c90b7078c3d3fb041b6586d95ab0 Closes-Bug: #1229324
222 lines
7.9 KiB
Python
222 lines
7.9 KiB
Python
# Copyright 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import eventlet
|
|
import eventlet.event
|
|
import eventlet.queue
|
|
|
|
from neutron.agent.linux import utils
|
|
from neutron.openstack.common import log as logging
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class AsyncProcessException(Exception):
|
|
pass
|
|
|
|
|
|
class AsyncProcess(object):
|
|
"""Manages an asynchronous process.
|
|
|
|
This class spawns a new process via subprocess and uses
|
|
greenthreads to read stderr and stdout asynchronously into queues
|
|
that can be read via repeatedly calling iter_stdout() and
|
|
iter_stderr().
|
|
|
|
If respawn_interval is non-zero, any error in communicating with
|
|
the managed process will result in the process and greenthreads
|
|
being cleaned up and the process restarted after the specified
|
|
interval.
|
|
|
|
Example usage:
|
|
|
|
>>> import time
|
|
>>> proc = AsyncProcess(['ping'])
|
|
>>> proc.start()
|
|
>>> time.sleep(5)
|
|
>>> proc.stop()
|
|
>>> for line in proc.iter_stdout():
|
|
... print line
|
|
"""
|
|
|
|
def __init__(self, cmd, root_helper=None, respawn_interval=None):
|
|
"""Constructor.
|
|
|
|
:param cmd: The list of command arguments to invoke.
|
|
:param root_helper: Optional, utility to use when running shell cmds.
|
|
:param respawn_interval: Optional, the interval in seconds to wait
|
|
to respawn after unexpected process death. Respawn will
|
|
only be attempted if a value of 0 or greater is provided.
|
|
"""
|
|
self.cmd = cmd
|
|
self.root_helper = root_helper
|
|
if respawn_interval is not None and respawn_interval < 0:
|
|
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
|
|
self.respawn_interval = respawn_interval
|
|
self._process = None
|
|
self._kill_event = None
|
|
self._reset_queues()
|
|
self._watchers = []
|
|
|
|
def _reset_queues(self):
|
|
self._stdout_lines = eventlet.queue.LightQueue()
|
|
self._stderr_lines = eventlet.queue.LightQueue()
|
|
|
|
def start(self):
|
|
"""Launch a process and monitor it asynchronously."""
|
|
if self._kill_event:
|
|
raise AsyncProcessException(_('Process is already started'))
|
|
else:
|
|
LOG.debug(_('Launching async process [%s].'), self.cmd)
|
|
self._spawn()
|
|
|
|
def stop(self):
|
|
"""Halt the process and watcher threads."""
|
|
if self._kill_event:
|
|
LOG.debug(_('Halting async process [%s].'), self.cmd)
|
|
self._kill()
|
|
else:
|
|
raise AsyncProcessException(_('Process is not running.'))
|
|
|
|
def _spawn(self):
|
|
"""Spawn a process and its watchers."""
|
|
self._kill_event = eventlet.event.Event()
|
|
self._process, cmd = utils.create_process(self.cmd,
|
|
root_helper=self.root_helper)
|
|
self._watchers = []
|
|
for reader in (self._read_stdout, self._read_stderr):
|
|
# Pass the stop event directly to the greenthread to
|
|
# ensure that assignment of a new event to the instance
|
|
# attribute does not prevent the greenthread from using
|
|
# the original event.
|
|
watcher = eventlet.spawn(self._watch_process,
|
|
reader,
|
|
self._kill_event)
|
|
self._watchers.append(watcher)
|
|
|
|
def _kill(self, respawning=False):
|
|
"""Kill the process and the associated watcher greenthreads.
|
|
|
|
:param respawning: Optional, whether respawn will be subsequently
|
|
attempted.
|
|
"""
|
|
# Halt the greenthreads
|
|
self._kill_event.send()
|
|
|
|
pid = self._get_pid_to_kill()
|
|
if pid:
|
|
self._kill_process(pid)
|
|
|
|
if not respawning:
|
|
# Clear the kill event to ensure the process can be
|
|
# explicitly started again.
|
|
self._kill_event = None
|
|
|
|
def _get_pid_to_kill(self):
|
|
pid = self._process.pid
|
|
# If root helper was used, two or more processes will be created:
|
|
#
|
|
# - a root helper process (e.g. sudo myscript)
|
|
# - possibly a rootwrap script (e.g. neutron-rootwrap)
|
|
# - a child process (e.g. myscript)
|
|
#
|
|
# Killing the root helper process will leave the child process
|
|
# running, re-parented to init, so the only way to ensure that both
|
|
# die is to target the child process directly.
|
|
if self.root_helper:
|
|
try:
|
|
pid = utils.find_child_pids(pid)[0]
|
|
except IndexError:
|
|
# Process is already dead
|
|
return None
|
|
while True:
|
|
try:
|
|
# We shouldn't have more than one child per process
|
|
# so keep getting the children of the first one
|
|
pid = utils.find_child_pids(pid)[0]
|
|
except IndexError:
|
|
# Last process in the tree, return it
|
|
break
|
|
return pid
|
|
|
|
def _kill_process(self, pid):
|
|
try:
|
|
# A process started by a root helper will be running as
|
|
# root and need to be killed via the same helper.
|
|
utils.execute(['kill', '-9', pid], root_helper=self.root_helper)
|
|
except Exception as ex:
|
|
stale_pid = (isinstance(ex, RuntimeError) and
|
|
'No such process' in str(ex))
|
|
if not stale_pid:
|
|
LOG.exception(_('An error occurred while killing [%s].'),
|
|
self.cmd)
|
|
return False
|
|
return True
|
|
|
|
def _handle_process_error(self):
|
|
"""Kill the async process and respawn if necessary."""
|
|
LOG.debug(_('Halting async process [%s] in response to an error.'),
|
|
self.cmd)
|
|
respawning = self.respawn_interval >= 0
|
|
self._kill(respawning=respawning)
|
|
if respawning:
|
|
eventlet.sleep(self.respawn_interval)
|
|
LOG.debug(_('Respawning async process [%s].'), self.cmd)
|
|
self._spawn()
|
|
|
|
def _watch_process(self, callback, kill_event):
|
|
while not kill_event.ready():
|
|
try:
|
|
if not callback():
|
|
break
|
|
except Exception:
|
|
LOG.exception(_('An error occurred while communicating '
|
|
'with async process [%s].'), self.cmd)
|
|
break
|
|
# Ensure that watching a process with lots of output does
|
|
# not block execution of other greenthreads.
|
|
eventlet.sleep()
|
|
# The kill event not being ready indicates that the loop was
|
|
# broken out of due to an error in the watched process rather
|
|
# than the loop condition being satisfied.
|
|
if not kill_event.ready():
|
|
self._handle_process_error()
|
|
|
|
def _read(self, stream, queue):
|
|
data = stream.readline()
|
|
if data:
|
|
data = data.strip()
|
|
queue.put(data)
|
|
return data
|
|
|
|
def _read_stdout(self):
|
|
return self._read(self._process.stdout, self._stdout_lines)
|
|
|
|
def _read_stderr(self):
|
|
return self._read(self._process.stderr, self._stderr_lines)
|
|
|
|
def _iter_queue(self, queue):
|
|
while True:
|
|
try:
|
|
yield queue.get_nowait()
|
|
except eventlet.queue.Empty:
|
|
break
|
|
|
|
def iter_stdout(self):
|
|
return self._iter_queue(self._stdout_lines)
|
|
|
|
def iter_stderr(self):
|
|
return self._iter_queue(self._stderr_lines)
|