Merge "Update oslo before bringing in exceptions"
This commit is contained in:
commit
7d913b0d9b
@ -23,11 +23,12 @@ context or provide additional information in their specific WSGI pipeline.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import itertools
|
import itertools
|
||||||
import uuid
|
|
||||||
|
from ceilometer.openstack.common import uuidutils
|
||||||
|
|
||||||
|
|
||||||
def generate_request_id():
|
def generate_request_id():
|
||||||
return 'req-' + str(uuid.uuid4())
|
return 'req-%s' % uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
|
||||||
class RequestContext(object):
|
class RequestContext(object):
|
||||||
|
@ -38,11 +38,21 @@ import functools
|
|||||||
import inspect
|
import inspect
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
|
import types
|
||||||
import xmlrpclib
|
import xmlrpclib
|
||||||
|
|
||||||
from ceilometer.openstack.common import timeutils
|
from ceilometer.openstack.common import timeutils
|
||||||
|
|
||||||
|
|
||||||
|
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
||||||
|
inspect.isfunction, inspect.isgeneratorfunction,
|
||||||
|
inspect.isgenerator, inspect.istraceback, inspect.isframe,
|
||||||
|
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
||||||
|
inspect.isabstract]
|
||||||
|
|
||||||
|
_simple_types = (types.NoneType, int, basestring, bool, float, long)
|
||||||
|
|
||||||
|
|
||||||
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||||
level=0, max_depth=3):
|
level=0, max_depth=3):
|
||||||
"""Convert a complex object into primitives.
|
"""Convert a complex object into primitives.
|
||||||
@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
Therefore, convert_instances=True is lossy ... be aware.
|
Therefore, convert_instances=True is lossy ... be aware.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
# handle obvious types first - order of basic types determined by running
|
||||||
inspect.isfunction, inspect.isgeneratorfunction,
|
# full tests on nova project, resulting in the following counts:
|
||||||
inspect.isgenerator, inspect.istraceback, inspect.isframe,
|
# 572754 <type 'NoneType'>
|
||||||
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
# 460353 <type 'int'>
|
||||||
inspect.isabstract]
|
# 379632 <type 'unicode'>
|
||||||
for test in nasty:
|
# 274610 <type 'str'>
|
||||||
if test(value):
|
# 199918 <type 'dict'>
|
||||||
return unicode(value)
|
# 114200 <type 'datetime.datetime'>
|
||||||
|
# 51817 <type 'bool'>
|
||||||
|
# 26164 <type 'list'>
|
||||||
|
# 6491 <type 'float'>
|
||||||
|
# 283 <type 'tuple'>
|
||||||
|
# 19 <type 'long'>
|
||||||
|
if isinstance(value, _simple_types):
|
||||||
|
return value
|
||||||
|
|
||||||
# value of itertools.count doesn't get caught by inspects
|
if isinstance(value, datetime.datetime):
|
||||||
# above and results in infinite loop when list(value) is called.
|
if convert_datetime:
|
||||||
|
return timeutils.strtime(value)
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
|
# value of itertools.count doesn't get caught by nasty_type_tests
|
||||||
|
# and results in infinite loop when list(value) is called.
|
||||||
if type(value) == itertools.count:
|
if type(value) == itertools.count:
|
||||||
return unicode(value)
|
return unicode(value)
|
||||||
|
|
||||||
@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
convert_datetime=convert_datetime,
|
convert_datetime=convert_datetime,
|
||||||
level=level,
|
level=level,
|
||||||
max_depth=max_depth)
|
max_depth=max_depth)
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return dict((k, recursive(v)) for k, v in value.iteritems())
|
||||||
|
elif isinstance(value, (list, tuple)):
|
||||||
|
return [recursive(lv) for lv in value]
|
||||||
|
|
||||||
# It's not clear why xmlrpclib created their own DateTime type, but
|
# It's not clear why xmlrpclib created their own DateTime type, but
|
||||||
# for our purposes, make it a datetime type which is explicitly
|
# for our purposes, make it a datetime type which is explicitly
|
||||||
# handled
|
# handled
|
||||||
if isinstance(value, xmlrpclib.DateTime):
|
if isinstance(value, xmlrpclib.DateTime):
|
||||||
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
||||||
|
|
||||||
if isinstance(value, (list, tuple)):
|
if convert_datetime and isinstance(value, datetime.datetime):
|
||||||
return [recursive(v) for v in value]
|
|
||||||
elif isinstance(value, dict):
|
|
||||||
return dict((k, recursive(v)) for k, v in value.iteritems())
|
|
||||||
elif convert_datetime and isinstance(value, datetime.datetime):
|
|
||||||
return timeutils.strtime(value)
|
return timeutils.strtime(value)
|
||||||
elif hasattr(value, 'iteritems'):
|
elif hasattr(value, 'iteritems'):
|
||||||
return recursive(dict(value.iteritems()), level=level + 1)
|
return recursive(dict(value.iteritems()), level=level + 1)
|
||||||
@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
# Ignore class member vars.
|
# Ignore class member vars.
|
||||||
return recursive(value.__dict__, level=level + 1)
|
return recursive(value.__dict__, level=level + 1)
|
||||||
else:
|
else:
|
||||||
|
if any(test(value) for test in _nasty_type_tests):
|
||||||
|
return unicode(value)
|
||||||
return value
|
return value
|
||||||
except TypeError:
|
except TypeError:
|
||||||
# Class objects are tricky since they may define something like
|
# Class objects are tricky since they may define something like
|
||||||
|
@ -37,7 +37,6 @@ import logging
|
|||||||
import logging.config
|
import logging.config
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
import os
|
import os
|
||||||
import stat
|
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
@ -104,10 +103,7 @@ logging_cli_opts = [
|
|||||||
generic_log_opts = [
|
generic_log_opts = [
|
||||||
cfg.BoolOpt('use_stderr',
|
cfg.BoolOpt('use_stderr',
|
||||||
default=True,
|
default=True,
|
||||||
help='Log output to standard error'),
|
help='Log output to standard error')
|
||||||
cfg.StrOpt('logfile_mode',
|
|
||||||
default='0644',
|
|
||||||
help='Default file mode used when creating log files'),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
log_opts = [
|
log_opts = [
|
||||||
@ -211,7 +207,27 @@ def _get_log_file_path(binary=None):
|
|||||||
return '%s.log' % (os.path.join(logdir, binary),)
|
return '%s.log' % (os.path.join(logdir, binary),)
|
||||||
|
|
||||||
|
|
||||||
class ContextAdapter(logging.LoggerAdapter):
|
class BaseLoggerAdapter(logging.LoggerAdapter):
|
||||||
|
|
||||||
|
def audit(self, msg, *args, **kwargs):
|
||||||
|
self.log(logging.AUDIT, msg, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class LazyAdapter(BaseLoggerAdapter):
|
||||||
|
def __init__(self, name='unknown', version='unknown'):
|
||||||
|
self._logger = None
|
||||||
|
self.extra = {}
|
||||||
|
self.name = name
|
||||||
|
self.version = version
|
||||||
|
|
||||||
|
@property
|
||||||
|
def logger(self):
|
||||||
|
if not self._logger:
|
||||||
|
self._logger = getLogger(self.name, self.version)
|
||||||
|
return self._logger
|
||||||
|
|
||||||
|
|
||||||
|
class ContextAdapter(BaseLoggerAdapter):
|
||||||
warn = logging.LoggerAdapter.warning
|
warn = logging.LoggerAdapter.warning
|
||||||
|
|
||||||
def __init__(self, logger, project_name, version_string):
|
def __init__(self, logger, project_name, version_string):
|
||||||
@ -219,8 +235,9 @@ class ContextAdapter(logging.LoggerAdapter):
|
|||||||
self.project = project_name
|
self.project = project_name
|
||||||
self.version = version_string
|
self.version = version_string
|
||||||
|
|
||||||
def audit(self, msg, *args, **kwargs):
|
@property
|
||||||
self.log(logging.AUDIT, msg, *args, **kwargs)
|
def handlers(self):
|
||||||
|
return self.logger.handlers
|
||||||
|
|
||||||
def deprecated(self, msg, *args, **kwargs):
|
def deprecated(self, msg, *args, **kwargs):
|
||||||
stdmsg = _("Deprecated: %s") % msg
|
stdmsg = _("Deprecated: %s") % msg
|
||||||
@ -340,7 +357,7 @@ class LogConfigError(Exception):
|
|||||||
def _load_log_config(log_config):
|
def _load_log_config(log_config):
|
||||||
try:
|
try:
|
||||||
logging.config.fileConfig(log_config)
|
logging.config.fileConfig(log_config)
|
||||||
except ConfigParser.Error, exc:
|
except ConfigParser.Error as exc:
|
||||||
raise LogConfigError(log_config, str(exc))
|
raise LogConfigError(log_config, str(exc))
|
||||||
|
|
||||||
|
|
||||||
@ -399,11 +416,6 @@ def _setup_logging_from_conf():
|
|||||||
filelog = logging.handlers.WatchedFileHandler(logpath)
|
filelog = logging.handlers.WatchedFileHandler(logpath)
|
||||||
log_root.addHandler(filelog)
|
log_root.addHandler(filelog)
|
||||||
|
|
||||||
mode = int(CONF.logfile_mode, 8)
|
|
||||||
st = os.stat(logpath)
|
|
||||||
if st.st_mode != (stat.S_IFREG | mode):
|
|
||||||
os.chmod(logpath, mode)
|
|
||||||
|
|
||||||
if CONF.use_stderr:
|
if CONF.use_stderr:
|
||||||
streamlog = ColorHandler()
|
streamlog = ColorHandler()
|
||||||
log_root.addHandler(streamlog)
|
log_root.addHandler(streamlog)
|
||||||
@ -432,14 +444,11 @@ def _setup_logging_from_conf():
|
|||||||
else:
|
else:
|
||||||
log_root.setLevel(logging.WARNING)
|
log_root.setLevel(logging.WARNING)
|
||||||
|
|
||||||
level = logging.NOTSET
|
|
||||||
for pair in CONF.default_log_levels:
|
for pair in CONF.default_log_levels:
|
||||||
mod, _sep, level_name = pair.partition('=')
|
mod, _sep, level_name = pair.partition('=')
|
||||||
level = logging.getLevelName(level_name)
|
level = logging.getLevelName(level_name)
|
||||||
logger = logging.getLogger(mod)
|
logger = logging.getLogger(mod)
|
||||||
logger.setLevel(level)
|
logger.setLevel(level)
|
||||||
for handler in log_root.handlers:
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
_loggers = {}
|
_loggers = {}
|
||||||
|
|
||||||
@ -452,6 +461,15 @@ def getLogger(name='unknown', version='unknown'):
|
|||||||
return _loggers[name]
|
return _loggers[name]
|
||||||
|
|
||||||
|
|
||||||
|
def getLazyLogger(name='unknown', version='unknown'):
|
||||||
|
"""
|
||||||
|
create a pass-through logger that does not create the real logger
|
||||||
|
until it is really needed and delegates all calls to the real logger
|
||||||
|
once it is created
|
||||||
|
"""
|
||||||
|
return LazyAdapter(name, version)
|
||||||
|
|
||||||
|
|
||||||
class WritableLogger(object):
|
class WritableLogger(object):
|
||||||
"""A thin wrapper that responds to `write` and logs."""
|
"""A thin wrapper that responds to `write` and logs."""
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ class FixedIntervalLoopingCall(LoopingCallBase):
|
|||||||
LOG.warn(_('task run outlasted interval by %s sec') %
|
LOG.warn(_('task run outlasted interval by %s sec') %
|
||||||
-delay)
|
-delay)
|
||||||
greenthread.sleep(delay if delay > 0 else 0)
|
greenthread.sleep(delay if delay > 0 else 0)
|
||||||
except LoopingCallDone, e:
|
except LoopingCallDone as e:
|
||||||
self.stop()
|
self.stop()
|
||||||
done.send(e.retvalue)
|
done.send(e.retvalue)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -131,7 +131,7 @@ class DynamicLoopingCall(LoopingCallBase):
|
|||||||
LOG.debug(_('Dynamic looping call sleeping for %.02f '
|
LOG.debug(_('Dynamic looping call sleeping for %.02f '
|
||||||
'seconds'), idle)
|
'seconds'), idle)
|
||||||
greenthread.sleep(idle)
|
greenthread.sleep(idle)
|
||||||
except LoopingCallDone, e:
|
except LoopingCallDone as e:
|
||||||
self.stop()
|
self.stop()
|
||||||
done.send(e.retvalue)
|
done.send(e.retvalue)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -19,7 +19,8 @@
|
|||||||
Network-related utilities and helper functions.
|
Network-related utilities and helper functions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
from ceilometer.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -57,7 +57,6 @@ as it allows particular rules to be explicitly disabled.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import logging
|
|
||||||
import re
|
import re
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
@ -65,6 +64,7 @@ import urllib2
|
|||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import jsonutils
|
from ceilometer.openstack.common import jsonutils
|
||||||
|
from ceilometer.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
135
ceilometer/openstack/common/processutils.py
Normal file
135
ceilometer/openstack/common/processutils.py
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
System-level utilities and helper functions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import random
|
||||||
|
import shlex
|
||||||
|
|
||||||
|
from eventlet.green import subprocess
|
||||||
|
from eventlet import greenthread
|
||||||
|
|
||||||
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
|
from ceilometer.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class UnknownArgumentError(Exception):
|
||||||
|
def __init__(self, message=None):
|
||||||
|
super(UnknownArgumentError, self).__init__(message)
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessExecutionError(Exception):
|
||||||
|
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
|
||||||
|
description=None):
|
||||||
|
if description is None:
|
||||||
|
description = "Unexpected error while running command."
|
||||||
|
if exit_code is None:
|
||||||
|
exit_code = '-'
|
||||||
|
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
|
||||||
|
% (description, cmd, exit_code, stdout, stderr))
|
||||||
|
super(ProcessExecutionError, self).__init__(message)
|
||||||
|
|
||||||
|
|
||||||
|
def execute(*cmd, **kwargs):
|
||||||
|
"""
|
||||||
|
Helper method to shell out and execute a command through subprocess with
|
||||||
|
optional retry.
|
||||||
|
|
||||||
|
:param cmd: Passed to subprocess.Popen.
|
||||||
|
:type cmd: string
|
||||||
|
:param process_input: Send to opened process.
|
||||||
|
:type proces_input: string
|
||||||
|
:param check_exit_code: Defaults to 0. Will raise
|
||||||
|
:class:`ProcessExecutionError`
|
||||||
|
if the command exits without returning this value
|
||||||
|
as a returncode
|
||||||
|
:type check_exit_code: int
|
||||||
|
:param delay_on_retry: True | False. Defaults to True. If set to True,
|
||||||
|
wait a short amount of time before retrying.
|
||||||
|
:type delay_on_retry: boolean
|
||||||
|
:param attempts: How many times to retry cmd.
|
||||||
|
:type attempts: int
|
||||||
|
:param run_as_root: True | False. Defaults to False. If set to True,
|
||||||
|
the command is prefixed by the command specified
|
||||||
|
in the root_helper kwarg.
|
||||||
|
:type run_as_root: boolean
|
||||||
|
:param root_helper: command to prefix all cmd's with
|
||||||
|
:type root_helper: string
|
||||||
|
:returns: (stdout, stderr) from process execution
|
||||||
|
:raises: :class:`UnknownArgumentError` on
|
||||||
|
receiving unknown arguments
|
||||||
|
:raises: :class:`ProcessExecutionError`
|
||||||
|
"""
|
||||||
|
|
||||||
|
process_input = kwargs.pop('process_input', None)
|
||||||
|
check_exit_code = kwargs.pop('check_exit_code', 0)
|
||||||
|
delay_on_retry = kwargs.pop('delay_on_retry', True)
|
||||||
|
attempts = kwargs.pop('attempts', 1)
|
||||||
|
run_as_root = kwargs.pop('run_as_root', False)
|
||||||
|
root_helper = kwargs.pop('root_helper', '')
|
||||||
|
if len(kwargs):
|
||||||
|
raise UnknownArgumentError(_('Got unknown keyword args '
|
||||||
|
'to utils.execute: %r') % kwargs)
|
||||||
|
if run_as_root:
|
||||||
|
cmd = shlex.split(root_helper) + list(cmd)
|
||||||
|
cmd = map(str, cmd)
|
||||||
|
|
||||||
|
while attempts > 0:
|
||||||
|
attempts -= 1
|
||||||
|
try:
|
||||||
|
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
|
||||||
|
_PIPE = subprocess.PIPE # pylint: disable=E1101
|
||||||
|
obj = subprocess.Popen(cmd,
|
||||||
|
stdin=_PIPE,
|
||||||
|
stdout=_PIPE,
|
||||||
|
stderr=_PIPE,
|
||||||
|
close_fds=True)
|
||||||
|
result = None
|
||||||
|
if process_input is not None:
|
||||||
|
result = obj.communicate(process_input)
|
||||||
|
else:
|
||||||
|
result = obj.communicate()
|
||||||
|
obj.stdin.close() # pylint: disable=E1101
|
||||||
|
_returncode = obj.returncode # pylint: disable=E1101
|
||||||
|
if _returncode:
|
||||||
|
LOG.debug(_('Result was %s') % _returncode)
|
||||||
|
if (isinstance(check_exit_code, int) and
|
||||||
|
not isinstance(check_exit_code, bool) and
|
||||||
|
_returncode != check_exit_code):
|
||||||
|
(stdout, stderr) = result
|
||||||
|
raise ProcessExecutionError(exit_code=_returncode,
|
||||||
|
stdout=stdout,
|
||||||
|
stderr=stderr,
|
||||||
|
cmd=' '.join(cmd))
|
||||||
|
return result
|
||||||
|
except ProcessExecutionError:
|
||||||
|
if not attempts:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
LOG.debug(_('%r failed. Retrying.'), cmd)
|
||||||
|
if delay_on_retry:
|
||||||
|
greenthread.sleep(random.randint(20, 200) / 100.0)
|
||||||
|
finally:
|
||||||
|
# NOTE(termie): this appears to be necessary to let the subprocess
|
||||||
|
# call clean something up in between calls, without
|
||||||
|
# it two execute calls in a row hangs the second one
|
||||||
|
greenthread.sleep(0)
|
@ -26,13 +26,13 @@ For some wrappers that add message versioning to rpc, see:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
from ceilometer.openstack.common import importutils
|
from ceilometer.openstack.common import importutils
|
||||||
from ceilometer.openstack.common import local
|
from ceilometer.openstack.common import local
|
||||||
|
from ceilometer.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
ctxt = unpack_context(self.conf, message_data)
|
ctxt = unpack_context(self.conf, message_data)
|
||||||
method = message_data.get('method')
|
method = message_data.get('method')
|
||||||
args = message_data.get('args', {})
|
args = message_data.get('args', {})
|
||||||
version = message_data.get('version', None)
|
version = message_data.get('version')
|
||||||
|
namespace = message_data.get('namespace')
|
||||||
if not method:
|
if not method:
|
||||||
LOG.warn(_('no method for message: %s') % message_data)
|
LOG.warn(_('no method for message: %s') % message_data)
|
||||||
ctxt.reply(_('No method for message: %s') % message_data,
|
ctxt.reply(_('No method for message: %s') % message_data,
|
||||||
connection_pool=self.connection_pool)
|
connection_pool=self.connection_pool)
|
||||||
return
|
return
|
||||||
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
|
self.pool.spawn_n(self._process_data, ctxt, version, method,
|
||||||
|
namespace, args)
|
||||||
|
|
||||||
def _process_data(self, ctxt, version, method, args):
|
def _process_data(self, ctxt, version, method, namespace, args):
|
||||||
"""Process a message in a new thread.
|
"""Process a message in a new thread.
|
||||||
|
|
||||||
If the proxy object we have has a dispatch method
|
If the proxy object we have has a dispatch method
|
||||||
@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
"""
|
"""
|
||||||
ctxt.update_store()
|
ctxt.update_store()
|
||||||
try:
|
try:
|
||||||
rval = self.proxy.dispatch(ctxt, version, method, **args)
|
rval = self.proxy.dispatch(ctxt, version, method, namespace,
|
||||||
|
**args)
|
||||||
# Check if the result was a generator
|
# Check if the result was a generator
|
||||||
if inspect.isgenerator(rval):
|
if inspect.isgenerator(rval):
|
||||||
for x in rval:
|
for x in rval:
|
||||||
|
@ -276,7 +276,7 @@ def _safe_log(log_func, msg, msg_data):
|
|||||||
for elem in arg[:-1]:
|
for elem in arg[:-1]:
|
||||||
d = d[elem]
|
d = d[elem]
|
||||||
d[arg[-1]] = '<SANITIZED>'
|
d[arg[-1]] = '<SANITIZED>'
|
||||||
except KeyError, e:
|
except KeyError as e:
|
||||||
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
|
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
|
||||||
{'item': arg,
|
{'item': arg,
|
||||||
'err': e})
|
'err': e})
|
||||||
@ -419,7 +419,7 @@ class ClientException(Exception):
|
|||||||
def catch_client_exception(exceptions, func, *args, **kwargs):
|
def catch_client_exception(exceptions, func, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
return func(*args, **kwargs)
|
return func(*args, **kwargs)
|
||||||
except Exception, e:
|
except Exception as e:
|
||||||
if type(e) in exceptions:
|
if type(e) in exceptions:
|
||||||
raise ClientException()
|
raise ClientException()
|
||||||
else:
|
else:
|
||||||
|
@ -103,13 +103,16 @@ class RpcDispatcher(object):
|
|||||||
self.callbacks = callbacks
|
self.callbacks = callbacks
|
||||||
super(RpcDispatcher, self).__init__()
|
super(RpcDispatcher, self).__init__()
|
||||||
|
|
||||||
def dispatch(self, ctxt, version, method, **kwargs):
|
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||||
"""Dispatch a message based on a requested version.
|
"""Dispatch a message based on a requested version.
|
||||||
|
|
||||||
:param ctxt: The request context
|
:param ctxt: The request context
|
||||||
:param version: The requested API version from the incoming message
|
:param version: The requested API version from the incoming message
|
||||||
:param method: The method requested to be called by the incoming
|
:param method: The method requested to be called by the incoming
|
||||||
message.
|
message.
|
||||||
|
:param namespace: The namespace for the requested method. If None,
|
||||||
|
the dispatcher will look for a method on a callback
|
||||||
|
object with no namespace set.
|
||||||
:param kwargs: A dict of keyword arguments to be passed to the method.
|
:param kwargs: A dict of keyword arguments to be passed to the method.
|
||||||
|
|
||||||
:returns: Whatever is returned by the underlying method that gets
|
:returns: Whatever is returned by the underlying method that gets
|
||||||
@ -120,13 +123,25 @@ class RpcDispatcher(object):
|
|||||||
|
|
||||||
had_compatible = False
|
had_compatible = False
|
||||||
for proxyobj in self.callbacks:
|
for proxyobj in self.callbacks:
|
||||||
if hasattr(proxyobj, 'RPC_API_VERSION'):
|
# Check for namespace compatibility
|
||||||
|
try:
|
||||||
|
cb_namespace = proxyobj.RPC_API_NAMESPACE
|
||||||
|
except AttributeError:
|
||||||
|
cb_namespace = None
|
||||||
|
|
||||||
|
if namespace != cb_namespace:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for version compatibility
|
||||||
|
try:
|
||||||
rpc_api_version = proxyobj.RPC_API_VERSION
|
rpc_api_version = proxyobj.RPC_API_VERSION
|
||||||
else:
|
except AttributeError:
|
||||||
rpc_api_version = '1.0'
|
rpc_api_version = '1.0'
|
||||||
|
|
||||||
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
|
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
|
||||||
version)
|
version)
|
||||||
had_compatible = had_compatible or is_compatible
|
had_compatible = had_compatible or is_compatible
|
||||||
|
|
||||||
if not hasattr(proxyobj, method):
|
if not hasattr(proxyobj, method):
|
||||||
continue
|
continue
|
||||||
if is_compatible:
|
if is_compatible:
|
||||||
|
@ -57,13 +57,14 @@ class Consumer(object):
|
|||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.proxy = proxy
|
self.proxy = proxy
|
||||||
|
|
||||||
def call(self, context, version, method, args, timeout):
|
def call(self, context, version, method, namespace, args, timeout):
|
||||||
done = eventlet.event.Event()
|
done = eventlet.event.Event()
|
||||||
|
|
||||||
def _inner():
|
def _inner():
|
||||||
ctxt = RpcContext.from_dict(context.to_dict())
|
ctxt = RpcContext.from_dict(context.to_dict())
|
||||||
try:
|
try:
|
||||||
rval = self.proxy.dispatch(context, version, method, **args)
|
rval = self.proxy.dispatch(context, version, method,
|
||||||
|
namespace, **args)
|
||||||
res = []
|
res = []
|
||||||
# Caller might have called ctxt.reply() manually
|
# Caller might have called ctxt.reply() manually
|
||||||
for (reply, failure) in ctxt._response:
|
for (reply, failure) in ctxt._response:
|
||||||
@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
|
|||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version', None)
|
||||||
|
namespace = msg.get('namespace', None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
consumer = CONSUMERS[topic][0]
|
consumer = CONSUMERS[topic][0]
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
return iter([None])
|
return iter([None])
|
||||||
else:
|
else:
|
||||||
return consumer.call(context, version, method, args, timeout)
|
return consumer.call(context, version, method, namespace, args,
|
||||||
|
timeout)
|
||||||
|
|
||||||
|
|
||||||
def call(conf, context, topic, msg, timeout=None):
|
def call(conf, context, topic, msg, timeout=None):
|
||||||
@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
|
|||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version', None)
|
||||||
|
namespace = msg.get('namespace', None)
|
||||||
|
|
||||||
for consumer in CONSUMERS.get(topic, []):
|
for consumer in CONSUMERS.get(topic, []):
|
||||||
try:
|
try:
|
||||||
consumer.call(context, version, method, args, None)
|
consumer.call(context, version, method, namespace, args, None)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
@ -176,7 +176,7 @@ class ConsumerBase(object):
|
|||||||
"""Cancel the consuming from the queue, if it has started"""
|
"""Cancel the consuming from the queue, if it has started"""
|
||||||
try:
|
try:
|
||||||
self.queue.cancel(self.tag)
|
self.queue.cancel(self.tag)
|
||||||
except KeyError, e:
|
except KeyError as e:
|
||||||
# NOTE(comstud): Kludge to get around a amqplib bug
|
# NOTE(comstud): Kludge to get around a amqplib bug
|
||||||
if str(e) != "u'%s'" % self.tag:
|
if str(e) != "u'%s'" % self.tag:
|
||||||
raise
|
raise
|
||||||
@ -520,7 +520,7 @@ class Connection(object):
|
|||||||
return
|
return
|
||||||
except (IOError, self.connection_errors) as e:
|
except (IOError, self.connection_errors) as e:
|
||||||
pass
|
pass
|
||||||
except Exception, e:
|
except Exception as e:
|
||||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||||
# to return an error not covered by its transport
|
# to return an error not covered by its transport
|
||||||
# connection_errors in the case of a timeout waiting for
|
# connection_errors in the case of a timeout waiting for
|
||||||
@ -561,10 +561,10 @@ class Connection(object):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return method(*args, **kwargs)
|
return method(*args, **kwargs)
|
||||||
except (self.connection_errors, socket.timeout, IOError), e:
|
except (self.connection_errors, socket.timeout, IOError) as e:
|
||||||
if error_callback:
|
if error_callback:
|
||||||
error_callback(e)
|
error_callback(e)
|
||||||
except Exception, e:
|
except Exception as e:
|
||||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||||
# to return an error not covered by its transport
|
# to return an error not covered by its transport
|
||||||
# connection_errors in the case of a timeout waiting for
|
# connection_errors in the case of a timeout waiting for
|
||||||
|
@ -346,7 +346,7 @@ class Connection(object):
|
|||||||
try:
|
try:
|
||||||
self.connection_create(broker)
|
self.connection_create(broker)
|
||||||
self.connection.open()
|
self.connection.open()
|
||||||
except qpid_exceptions.ConnectionError, e:
|
except qpid_exceptions.ConnectionError as e:
|
||||||
msg_dict = dict(e=e, delay=delay)
|
msg_dict = dict(e=e, delay=delay)
|
||||||
msg = _("Unable to connect to AMQP server: %(e)s. "
|
msg = _("Unable to connect to AMQP server: %(e)s. "
|
||||||
"Sleeping %(delay)s seconds") % msg_dict
|
"Sleeping %(delay)s seconds") % msg_dict
|
||||||
|
@ -276,12 +276,13 @@ class InternalContext(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
result = proxy.dispatch(
|
result = proxy.dispatch(
|
||||||
ctx, data['version'], data['method'], **data['args'])
|
ctx, data['version'], data['method'],
|
||||||
|
data.get('namespace'), **data['args'])
|
||||||
return ConsumerBase.normalize_reply(result, ctx.replies)
|
return ConsumerBase.normalize_reply(result, ctx.replies)
|
||||||
except greenlet.GreenletExit:
|
except greenlet.GreenletExit:
|
||||||
# ignore these since they are just from shutdowns
|
# ignore these since they are just from shutdowns
|
||||||
pass
|
pass
|
||||||
except rpc_common.ClientException, e:
|
except rpc_common.ClientException as e:
|
||||||
LOG.debug(_("Expected exception during message handling (%s)") %
|
LOG.debug(_("Expected exception during message handling (%s)") %
|
||||||
e._exc_info[1])
|
e._exc_info[1])
|
||||||
return {'exc':
|
return {'exc':
|
||||||
@ -351,7 +352,7 @@ class ConsumerBase(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
proxy.dispatch(ctx, data['version'],
|
proxy.dispatch(ctx, data['version'],
|
||||||
data['method'], **data['args'])
|
data['method'], data.get('namespace'), **data['args'])
|
||||||
|
|
||||||
|
|
||||||
class ZmqBaseReactor(ConsumerBase):
|
class ZmqBaseReactor(ConsumerBase):
|
||||||
|
@ -58,9 +58,13 @@ class RpcProxy(object):
|
|||||||
"""Return the topic to use for a message."""
|
"""Return the topic to use for a message."""
|
||||||
return topic if topic else self.topic
|
return topic if topic else self.topic
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def make_namespaced_msg(method, namespace, **kwargs):
|
||||||
|
return {'method': method, 'namespace': namespace, 'args': kwargs}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_msg(method, **kwargs):
|
def make_msg(method, **kwargs):
|
||||||
return {'method': method, 'args': kwargs}
|
return RpcProxy.make_namespaced_msg(method, None, **kwargs)
|
||||||
|
|
||||||
def call(self, context, msg, topic=None, version=None, timeout=None):
|
def call(self, context, msg, topic=None, version=None, timeout=None):
|
||||||
"""rpc.call() a remote method.
|
"""rpc.call() a remote method.
|
||||||
|
41
ceilometer/openstack/common/rpc/zmq_receiver.py
Executable file
41
ceilometer/openstack/common/rpc/zmq_receiver.py
Executable file
@ -0,0 +1,41 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from ceilometer.openstack.common import log as logging
|
||||||
|
from ceilometer.openstack.common import rpc
|
||||||
|
from ceilometer.openstack.common.rpc import impl_zmq
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(rpc.rpc_opts)
|
||||||
|
CONF.register_opts(impl_zmq.zmq_opts)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
CONF(sys.argv[1:], project='oslo')
|
||||||
|
logging.setup("oslo")
|
||||||
|
|
||||||
|
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
|
||||||
|
reactor.consume_in_thread()
|
||||||
|
reactor.wait()
|
39
ceilometer/openstack/common/uuidutils.py
Normal file
39
ceilometer/openstack/common/uuidutils.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright (c) 2012 Intel Corporation.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
UUID related utilities and helper functions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
def generate_uuid():
|
||||||
|
return str(uuid.uuid4())
|
||||||
|
|
||||||
|
|
||||||
|
def is_uuid_like(val):
|
||||||
|
"""Returns validation of a value as a UUID.
|
||||||
|
|
||||||
|
For our purposes, a UUID is a canonical form string:
|
||||||
|
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return str(uuid.UUID(val)) == val
|
||||||
|
except (TypeError, ValueError, AttributeError):
|
||||||
|
return False
|
Loading…
x
Reference in New Issue
Block a user