diff --git a/ceilometer/ipmi/platform/ipmitool.py b/ceilometer/ipmi/platform/ipmitool.py index 9dd87f072..18b156482 100644 --- a/ceilometer/ipmi/platform/ipmitool.py +++ b/ceilometer/ipmi/platform/ipmitool.py @@ -16,10 +16,10 @@ # under the License. """Utils to run ipmitool for data collection""" +from oslo.concurrency import processutils from ceilometer.ipmi.platform import exception as ipmiexcept from ceilometer.openstack.common.gettextutils import _ -from ceilometer.openstack.common import processutils from ceilometer import utils diff --git a/ceilometer/openstack/common/processutils.py b/ceilometer/openstack/common/processutils.py deleted file mode 100644 index c210f05d7..000000000 --- a/ceilometer/openstack/common/processutils.py +++ /dev/null @@ -1,289 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -System-level utilities and helper functions. -""" - -import errno -import logging -import multiprocessing -import os -import random -import shlex -import signal - -from eventlet.green import subprocess -from eventlet import greenthread -from oslo.utils import strutils -import six - -from ceilometer.openstack.common._i18n import _ - - -LOG = logging.getLogger(__name__) - - -class InvalidArgumentError(Exception): - def __init__(self, message=None): - super(InvalidArgumentError, self).__init__(message) - - -class UnknownArgumentError(Exception): - def __init__(self, message=None): - super(UnknownArgumentError, self).__init__(message) - - -class ProcessExecutionError(Exception): - def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, - description=None): - self.exit_code = exit_code - self.stderr = stderr - self.stdout = stdout - self.cmd = cmd - self.description = description - - if description is None: - description = _("Unexpected error while running command.") - if exit_code is None: - exit_code = '-' - message = _('%(description)s\n' - 'Command: %(cmd)s\n' - 'Exit code: %(exit_code)s\n' - 'Stdout: %(stdout)r\n' - 'Stderr: %(stderr)r') % {'description': description, - 'cmd': cmd, - 'exit_code': exit_code, - 'stdout': stdout, - 'stderr': stderr} - super(ProcessExecutionError, self).__init__(message) - - -class NoRootWrapSpecified(Exception): - def __init__(self, message=None): - super(NoRootWrapSpecified, self).__init__(message) - - -def _subprocess_setup(): - # Python installs a SIGPIPE handler by default. This is usually not what - # non-Python subprocesses expect. - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - - -def execute(*cmd, **kwargs): - """Helper method to shell out and execute a command through subprocess. - - Allows optional retry. - - :param cmd: Passed to subprocess.Popen. - :type cmd: string - :param process_input: Send to opened process. - :type process_input: string - :param env_variables: Environment variables and their values that - will be set for the process. - :type env_variables: dict - :param check_exit_code: Single bool, int, or list of allowed exit - codes. Defaults to [0]. Raise - :class:`ProcessExecutionError` unless - program exits with one of these code. - :type check_exit_code: boolean, int, or [int] - :param delay_on_retry: True | False. Defaults to True. If set to True, - wait a short amount of time before retrying. - :type delay_on_retry: boolean - :param attempts: How many times to retry cmd. - :type attempts: int - :param run_as_root: True | False. Defaults to False. If set to True, - the command is prefixed by the command specified - in the root_helper kwarg. - :type run_as_root: boolean - :param root_helper: command to prefix to commands called with - run_as_root=True - :type root_helper: string - :param shell: whether or not there should be a shell used to - execute this command. Defaults to false. - :type shell: boolean - :param loglevel: log level for execute commands. - :type loglevel: int. (Should be logging.DEBUG or logging.INFO) - :returns: (stdout, stderr) from process execution - :raises: :class:`UnknownArgumentError` on - receiving unknown arguments - :raises: :class:`ProcessExecutionError` - """ - - process_input = kwargs.pop('process_input', None) - env_variables = kwargs.pop('env_variables', None) - check_exit_code = kwargs.pop('check_exit_code', [0]) - ignore_exit_code = False - delay_on_retry = kwargs.pop('delay_on_retry', True) - attempts = kwargs.pop('attempts', 1) - run_as_root = kwargs.pop('run_as_root', False) - root_helper = kwargs.pop('root_helper', '') - shell = kwargs.pop('shell', False) - loglevel = kwargs.pop('loglevel', logging.DEBUG) - - if isinstance(check_exit_code, bool): - ignore_exit_code = not check_exit_code - check_exit_code = [0] - elif isinstance(check_exit_code, int): - check_exit_code = [check_exit_code] - - if kwargs: - raise UnknownArgumentError(_('Got unknown keyword args: %r') % kwargs) - - if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0: - if not root_helper: - raise NoRootWrapSpecified( - message=_('Command requested root, but did not ' - 'specify a root helper.')) - cmd = shlex.split(root_helper) + list(cmd) - - cmd = map(str, cmd) - sanitized_cmd = strutils.mask_password(' '.join(cmd)) - - while attempts > 0: - attempts -= 1 - try: - LOG.log(loglevel, _('Running cmd (subprocess): %s'), sanitized_cmd) - _PIPE = subprocess.PIPE # pylint: disable=E1101 - - if os.name == 'nt': - preexec_fn = None - close_fds = False - else: - preexec_fn = _subprocess_setup - close_fds = True - - obj = subprocess.Popen(cmd, - stdin=_PIPE, - stdout=_PIPE, - stderr=_PIPE, - close_fds=close_fds, - preexec_fn=preexec_fn, - shell=shell, - env=env_variables) - result = None - for _i in six.moves.range(20): - # NOTE(russellb) 20 is an arbitrary number of retries to - # prevent any chance of looping forever here. - try: - if process_input is not None: - result = obj.communicate(process_input) - else: - result = obj.communicate() - except OSError as e: - if e.errno in (errno.EAGAIN, errno.EINTR): - continue - raise - break - obj.stdin.close() # pylint: disable=E1101 - _returncode = obj.returncode # pylint: disable=E1101 - LOG.log(loglevel, 'Result was %s' % _returncode) - if not ignore_exit_code and _returncode not in check_exit_code: - (stdout, stderr) = result - sanitized_stdout = strutils.mask_password(stdout) - sanitized_stderr = strutils.mask_password(stderr) - raise ProcessExecutionError(exit_code=_returncode, - stdout=sanitized_stdout, - stderr=sanitized_stderr, - cmd=sanitized_cmd) - return result - except ProcessExecutionError: - if not attempts: - raise - else: - LOG.log(loglevel, _('%r failed. Retrying.'), sanitized_cmd) - if delay_on_retry: - greenthread.sleep(random.randint(20, 200) / 100.0) - finally: - # NOTE(termie): this appears to be necessary to let the subprocess - # call clean something up in between calls, without - # it two execute calls in a row hangs the second one - greenthread.sleep(0) - - -def trycmd(*args, **kwargs): - """A wrapper around execute() to more easily handle warnings and errors. - - Returns an (out, err) tuple of strings containing the output of - the command's stdout and stderr. If 'err' is not empty then the - command can be considered to have failed. - - :discard_warnings True | False. Defaults to False. If set to True, - then for succeeding commands, stderr is cleared - - """ - discard_warnings = kwargs.pop('discard_warnings', False) - - try: - out, err = execute(*args, **kwargs) - failed = False - except ProcessExecutionError as exn: - out, err = '', six.text_type(exn) - failed = True - - if not failed and discard_warnings and err: - # Handle commands that output to stderr but otherwise succeed - err = '' - - return out, err - - -def ssh_execute(ssh, cmd, process_input=None, - addl_env=None, check_exit_code=True): - sanitized_cmd = strutils.mask_password(cmd) - LOG.debug('Running cmd (SSH): %s', sanitized_cmd) - if addl_env: - raise InvalidArgumentError(_('Environment not supported over SSH')) - - if process_input: - # This is (probably) fixable if we need it... - raise InvalidArgumentError(_('process_input not supported over SSH')) - - stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd) - channel = stdout_stream.channel - - # NOTE(justinsb): This seems suspicious... - # ...other SSH clients have buffering issues with this approach - stdout = stdout_stream.read() - sanitized_stdout = strutils.mask_password(stdout) - stderr = stderr_stream.read() - sanitized_stderr = strutils.mask_password(stderr) - - stdin_stream.close() - - exit_status = channel.recv_exit_status() - - # exit_status == -1 if no exit code was returned - if exit_status != -1: - LOG.debug('Result was %s' % exit_status) - if check_exit_code and exit_status != 0: - raise ProcessExecutionError(exit_code=exit_status, - stdout=sanitized_stdout, - stderr=sanitized_stderr, - cmd=sanitized_cmd) - - return (sanitized_stdout, sanitized_stderr) - - -def get_worker_count(): - """Utility to get the default worker count. - - @return: The number of CPUs if that can be determined, else a default - worker count of 1 is returned. - """ - try: - return multiprocessing.cpu_count() - except NotImplementedError: - return 1 diff --git a/ceilometer/utils.py b/ceilometer/utils.py index fcf818fbc..1aa5ecf36 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -27,12 +27,12 @@ import hashlib import multiprocessing import struct +from oslo.concurrency import processutils from oslo.config import cfg from oslo.utils import timeutils from oslo.utils import units import six -from ceilometer.openstack.common import processutils OPTS = [ cfg.StrOpt('rootwrap_config', diff --git a/openstack-common.conf b/openstack-common.conf index 20b167fb4..5f2f94438 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -9,7 +9,6 @@ module=log module=log_handler module=loopingcall module=policy -module=processutils module=service module=threadgroup diff --git a/requirements-py3.txt b/requirements-py3.txt index aa8bdf148..7696bf325 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -14,6 +14,7 @@ lockfile>=0.8 lxml>=2.3 msgpack-python>=0.4.0 netaddr>=0.7.12 +oslo.concurrency>=0.1.0 # Apache-2.0 oslo.config>=1.4.0 # Apache-2.0 oslo.db>=1.0.0 # Apache-2.0 PasteDeploy>=1.5.0 diff --git a/requirements.txt b/requirements.txt index 7fc40a4f3..781f23dd9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ lxml>=2.3 msgpack-python>=0.4.0 netaddr>=0.7.12 oslo.db>=1.0.0 # Apache-2.0 +oslo.concurrency>=0.1.0 # Apache-2.0 oslo.config>=1.4.0 # Apache-2.0 oslo.rootwrap>=1.3.0 oslo.vmware>=0.6.0 # Apache-2.0