Updates Oslo modules
This commit is contained in:
parent
479531e5cc
commit
2a9be0e16d
@ -0,0 +1,17 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
|
||||
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
|
@ -25,7 +25,7 @@ import uuid
|
||||
|
||||
|
||||
def generate_request_id():
|
||||
return 'req-%s' % str(uuid.uuid4())
|
||||
return b'req-' + str(uuid.uuid4()).encode('ascii')
|
||||
|
||||
|
||||
class RequestContext(object):
|
||||
@ -77,6 +77,21 @@ class RequestContext(object):
|
||||
'instance_uuid': self.instance_uuid,
|
||||
'user_identity': user_idt}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx):
|
||||
return cls(
|
||||
auth_token=ctx.get("auth_token"),
|
||||
user=ctx.get("user"),
|
||||
tenant=ctx.get("tenant"),
|
||||
domain=ctx.get("domain"),
|
||||
user_domain=ctx.get("user_domain"),
|
||||
project_domain=ctx.get("project_domain"),
|
||||
is_admin=ctx.get("is_admin", False),
|
||||
read_only=ctx.get("read_only", False),
|
||||
show_deleted=ctx.get("show_deleted", False),
|
||||
request_id=ctx.get("request_id"),
|
||||
instance_uuid=ctx.get("instance_uuid"))
|
||||
|
||||
|
||||
def get_admin_context(show_deleted=False):
|
||||
context = RequestContext(None,
|
||||
@ -98,3 +113,14 @@ def get_context_from_function_and_args(function, args, kwargs):
|
||||
return arg
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def is_user_context(context):
|
||||
"""Indicates if the request context is a normal user."""
|
||||
if not context:
|
||||
return False
|
||||
if context.is_admin:
|
||||
return False
|
||||
if not context.user_id or not context.project_id:
|
||||
return False
|
||||
return True
|
||||
|
@ -29,7 +29,7 @@ import eventlet.backdoor
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LI
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
|
||||
help_for_backdoor_port = (
|
||||
@ -41,7 +41,6 @@ help_for_backdoor_port = (
|
||||
"chosen port is displayed in the service's log file.")
|
||||
eventlet_backdoor_opts = [
|
||||
cfg.StrOpt('backdoor_port',
|
||||
default=None,
|
||||
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
|
||||
]
|
||||
|
||||
@ -137,8 +136,10 @@ def initialize_if_enabled():
|
||||
# In the case of backdoor port being zero, a port number is assigned by
|
||||
# listen(). In any case, pull the port number out here.
|
||||
port = sock.getsockname()[1]
|
||||
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
|
||||
{'port': port, 'pid': os.getpid()})
|
||||
LOG.info(
|
||||
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d') %
|
||||
{'port': port, 'pid': os.getpid()}
|
||||
)
|
||||
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
|
||||
locals=backdoor_locals)
|
||||
return port
|
||||
|
@ -24,7 +24,7 @@ import traceback
|
||||
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LE
|
||||
|
||||
|
||||
class save_and_reraise_exception(object):
|
||||
@ -49,9 +49,22 @@ class save_and_reraise_exception(object):
|
||||
decide_if_need_reraise()
|
||||
if not should_be_reraised:
|
||||
ctxt.reraise = False
|
||||
|
||||
If another exception occurs and reraise flag is False,
|
||||
the saved exception will not be logged.
|
||||
|
||||
If the caller wants to raise new exception during exception handling
|
||||
he/she sets reraise to False initially with an ability to set it back to
|
||||
True if needed::
|
||||
|
||||
except Exception:
|
||||
with save_and_reraise_exception(reraise=False) as ctxt:
|
||||
[if statements to determine whether to raise a new exception]
|
||||
# Not raising a new exception, so reraise
|
||||
ctxt.reraise = True
|
||||
"""
|
||||
def __init__(self):
|
||||
self.reraise = True
|
||||
def __init__(self, reraise=True):
|
||||
self.reraise = reraise
|
||||
|
||||
def __enter__(self):
|
||||
self.type_, self.value, self.tb, = sys.exc_info()
|
||||
@ -59,10 +72,11 @@ class save_and_reraise_exception(object):
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
logging.error(_('Original exception being dropped: %s'),
|
||||
traceback.format_exception(self.type_,
|
||||
self.value,
|
||||
self.tb))
|
||||
if self.reraise:
|
||||
logging.error(_LE('Original exception being dropped: %s'),
|
||||
traceback.format_exception(self.type_,
|
||||
self.value,
|
||||
self.tb))
|
||||
return False
|
||||
if self.reraise:
|
||||
six.reraise(self.type_, self.value, self.tb)
|
||||
@ -88,8 +102,8 @@ def forever_retry_uncaught_exceptions(infunc):
|
||||
if (cur_time - last_log_time > 60 or
|
||||
this_exc_message != last_exc_message):
|
||||
logging.exception(
|
||||
_('Unexpected exception occurred %d time(s)... '
|
||||
'retrying.') % exc_count)
|
||||
_LE('Unexpected exception occurred %d time(s)... '
|
||||
'retrying.') % exc_count)
|
||||
last_log_time = cur_time
|
||||
last_exc_message = this_exc_message
|
||||
exc_count = 0
|
||||
|
@ -27,18 +27,119 @@ import gettext
|
||||
import locale
|
||||
from logging import handlers
|
||||
import os
|
||||
import re
|
||||
|
||||
from babel import localedata
|
||||
import six
|
||||
|
||||
_localedir = os.environ.get('cloudbaseinit'.upper() + '_LOCALEDIR')
|
||||
_t = gettext.translation('cloudbaseinit', localedir=_localedir, fallback=True)
|
||||
|
||||
_AVAILABLE_LANGUAGES = {}
|
||||
|
||||
# FIXME(dhellmann): Remove this when moving to oslo.i18n.
|
||||
USE_LAZY = False
|
||||
|
||||
|
||||
class TranslatorFactory(object):
|
||||
"""Create translator functions
|
||||
"""
|
||||
|
||||
def __init__(self, domain, localedir=None):
|
||||
"""Establish a set of translation functions for the domain.
|
||||
|
||||
:param domain: Name of translation domain,
|
||||
specifying a message catalog.
|
||||
:type domain: str
|
||||
:param lazy: Delays translation until a message is emitted.
|
||||
Defaults to False.
|
||||
:type lazy: Boolean
|
||||
:param localedir: Directory with translation catalogs.
|
||||
:type localedir: str
|
||||
"""
|
||||
self.domain = domain
|
||||
if localedir is None:
|
||||
localedir = os.environ.get(domain.upper() + '_LOCALEDIR')
|
||||
self.localedir = localedir
|
||||
|
||||
def _make_translation_func(self, domain=None):
|
||||
"""Return a new translation function ready for use.
|
||||
|
||||
Takes into account whether or not lazy translation is being
|
||||
done.
|
||||
|
||||
The domain can be specified to override the default from the
|
||||
factory, but the localedir from the factory is always used
|
||||
because we assume the log-level translation catalogs are
|
||||
installed in the same directory as the main application
|
||||
catalog.
|
||||
|
||||
"""
|
||||
if domain is None:
|
||||
domain = self.domain
|
||||
t = gettext.translation(domain,
|
||||
localedir=self.localedir,
|
||||
fallback=True)
|
||||
# Use the appropriate method of the translation object based
|
||||
# on the python version.
|
||||
m = t.gettext if six.PY3 else t.ugettext
|
||||
|
||||
def f(msg):
|
||||
"""oslo.i18n.gettextutils translation function."""
|
||||
if USE_LAZY:
|
||||
return Message(msg, domain=domain)
|
||||
return m(msg)
|
||||
return f
|
||||
|
||||
@property
|
||||
def primary(self):
|
||||
"The default translation function."
|
||||
return self._make_translation_func()
|
||||
|
||||
def _make_log_translation_func(self, level):
|
||||
return self._make_translation_func(self.domain + '-log-' + level)
|
||||
|
||||
@property
|
||||
def log_info(self):
|
||||
"Translate info-level log messages."
|
||||
return self._make_log_translation_func('info')
|
||||
|
||||
@property
|
||||
def log_warning(self):
|
||||
"Translate warning-level log messages."
|
||||
return self._make_log_translation_func('warning')
|
||||
|
||||
@property
|
||||
def log_error(self):
|
||||
"Translate error-level log messages."
|
||||
return self._make_log_translation_func('error')
|
||||
|
||||
@property
|
||||
def log_critical(self):
|
||||
"Translate critical-level log messages."
|
||||
return self._make_log_translation_func('critical')
|
||||
|
||||
|
||||
# NOTE(dhellmann): When this module moves out of the incubator into
|
||||
# oslo.i18n, these global variables can be moved to an integration
|
||||
# module within each application.
|
||||
|
||||
# Create the global translation functions.
|
||||
_translators = TranslatorFactory('cloudbaseinit')
|
||||
|
||||
# The primary translation function using the well-known name "_"
|
||||
_ = _translators.primary
|
||||
|
||||
# Translators for log levels.
|
||||
#
|
||||
# The abbreviated names are meant to reflect the usual use of a short
|
||||
# name like '_'. The "L" is for "log" and the other letter comes from
|
||||
# the level.
|
||||
_LI = _translators.log_info
|
||||
_LW = _translators.log_warning
|
||||
_LE = _translators.log_error
|
||||
_LC = _translators.log_critical
|
||||
|
||||
# NOTE(dhellmann): End of globals that will move to the application's
|
||||
# integration module.
|
||||
|
||||
|
||||
def enable_lazy():
|
||||
"""Convenience function for configuring _() to use lazy gettext
|
||||
|
||||
@ -51,16 +152,7 @@ def enable_lazy():
|
||||
USE_LAZY = True
|
||||
|
||||
|
||||
def _(msg):
|
||||
if USE_LAZY:
|
||||
return Message(msg, domain='cloudbaseinit')
|
||||
else:
|
||||
if six.PY3:
|
||||
return _t.gettext(msg)
|
||||
return _t.ugettext(msg)
|
||||
|
||||
|
||||
def install(domain, lazy=False):
|
||||
def install(domain):
|
||||
"""Install a _() function using the given translation domain.
|
||||
|
||||
Given a translation domain, install a _() function using gettext's
|
||||
@ -71,43 +163,14 @@ def install(domain, lazy=False):
|
||||
a translation-domain-specific environment variable (e.g.
|
||||
NOVA_LOCALEDIR).
|
||||
|
||||
Note that to enable lazy translation, enable_lazy must be
|
||||
called.
|
||||
|
||||
:param domain: the translation domain
|
||||
:param lazy: indicates whether or not to install the lazy _() function.
|
||||
The lazy _() introduces a way to do deferred translation
|
||||
of messages by installing a _ that builds Message objects,
|
||||
instead of strings, which can then be lazily translated into
|
||||
any available locale.
|
||||
"""
|
||||
if lazy:
|
||||
# NOTE(mrodden): Lazy gettext functionality.
|
||||
#
|
||||
# The following introduces a deferred way to do translations on
|
||||
# messages in OpenStack. We override the standard _() function
|
||||
# and % (format string) operation to build Message objects that can
|
||||
# later be translated when we have more information.
|
||||
def _lazy_gettext(msg):
|
||||
"""Create and return a Message object.
|
||||
|
||||
Lazy gettext function for a given domain, it is a factory method
|
||||
for a project/module to get a lazy gettext function for its own
|
||||
translation domain (i.e. nova, glance, cinder, etc.)
|
||||
|
||||
Message encapsulates a string so that we can translate
|
||||
it later when needed.
|
||||
"""
|
||||
return Message(msg, domain=domain)
|
||||
|
||||
from six import moves
|
||||
moves.builtins.__dict__['_'] = _lazy_gettext
|
||||
else:
|
||||
localedir = '%s_LOCALEDIR' % domain.upper()
|
||||
if six.PY3:
|
||||
gettext.install(domain,
|
||||
localedir=os.environ.get(localedir))
|
||||
else:
|
||||
gettext.install(domain,
|
||||
localedir=os.environ.get(localedir),
|
||||
unicode=True)
|
||||
from six import moves
|
||||
tf = TranslatorFactory(domain)
|
||||
moves.builtins.__dict__['_'] = tf.primary
|
||||
|
||||
|
||||
class Message(six.text_type):
|
||||
@ -214,47 +277,22 @@ class Message(six.text_type):
|
||||
if other is None:
|
||||
params = (other,)
|
||||
elif isinstance(other, dict):
|
||||
params = self._trim_dictionary_parameters(other)
|
||||
# Merge the dictionaries
|
||||
# Copy each item in case one does not support deep copy.
|
||||
params = {}
|
||||
if isinstance(self.params, dict):
|
||||
for key, val in self.params.items():
|
||||
params[key] = self._copy_param(val)
|
||||
for key, val in other.items():
|
||||
params[key] = self._copy_param(val)
|
||||
else:
|
||||
params = self._copy_param(other)
|
||||
return params
|
||||
|
||||
def _trim_dictionary_parameters(self, dict_param):
|
||||
"""Return a dict that only has matching entries in the msgid."""
|
||||
# NOTE(luisg): Here we trim down the dictionary passed as parameters
|
||||
# to avoid carrying a lot of unnecessary weight around in the message
|
||||
# object, for example if someone passes in Message() % locals() but
|
||||
# only some params are used, and additionally we prevent errors for
|
||||
# non-deepcopyable objects by unicoding() them.
|
||||
|
||||
# Look for %(param) keys in msgid;
|
||||
# Skip %% and deal with the case where % is first character on the line
|
||||
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
|
||||
|
||||
# If we don't find any %(param) keys but have a %s
|
||||
if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
|
||||
# Apparently the full dictionary is the parameter
|
||||
params = self._copy_param(dict_param)
|
||||
else:
|
||||
params = {}
|
||||
# Save our existing parameters as defaults to protect
|
||||
# ourselves from losing values if we are called through an
|
||||
# (erroneous) chain that builds a valid Message with
|
||||
# arguments, and then does something like "msg % kwds"
|
||||
# where kwds is an empty dictionary.
|
||||
src = {}
|
||||
if isinstance(self.params, dict):
|
||||
src.update(self.params)
|
||||
src.update(dict_param)
|
||||
for key in keys:
|
||||
params[key] = self._copy_param(src[key])
|
||||
|
||||
return params
|
||||
|
||||
def _copy_param(self, param):
|
||||
try:
|
||||
return copy.deepcopy(param)
|
||||
except TypeError:
|
||||
except Exception:
|
||||
# Fallback to casting to unicode this will handle the
|
||||
# python code-like objects that can't be deep-copied
|
||||
return six.text_type(param)
|
||||
@ -266,13 +304,14 @@ class Message(six.text_type):
|
||||
def __radd__(self, other):
|
||||
return self.__add__(other)
|
||||
|
||||
def __str__(self):
|
||||
# NOTE(luisg): Logging in python 2.6 tries to str() log records,
|
||||
# and it expects specifically a UnicodeError in order to proceed.
|
||||
msg = _('Message objects do not support str() because they may '
|
||||
'contain non-ascii characters. '
|
||||
'Please use unicode() or translate() instead.')
|
||||
raise UnicodeError(msg)
|
||||
if six.PY2:
|
||||
def __str__(self):
|
||||
# NOTE(luisg): Logging in python 2.6 tries to str() log records,
|
||||
# and it expects specifically a UnicodeError in order to proceed.
|
||||
msg = _('Message objects do not support str() because they may '
|
||||
'contain non-ascii characters. '
|
||||
'Please use unicode() or translate() instead.')
|
||||
raise UnicodeError(msg)
|
||||
|
||||
|
||||
def get_available_languages(domain):
|
||||
@ -315,8 +354,8 @@ def get_available_languages(domain):
|
||||
'zh_Hant_HK': 'zh_HK',
|
||||
'zh_Hant': 'zh_TW',
|
||||
'fil': 'tl_PH'}
|
||||
for (locale, alias) in six.iteritems(aliases):
|
||||
if locale in language_list and alias not in language_list:
|
||||
for (locale_, alias) in six.iteritems(aliases):
|
||||
if locale_ in language_list and alias not in language_list:
|
||||
language_list.append(alias)
|
||||
|
||||
_AVAILABLE_LANGUAGES[domain] = language_list
|
||||
|
@ -24,10 +24,10 @@ import traceback
|
||||
def import_class(import_str):
|
||||
"""Returns a class from a string including module and class."""
|
||||
mod_str, _sep, class_str = import_str.rpartition('.')
|
||||
__import__(mod_str)
|
||||
try:
|
||||
__import__(mod_str)
|
||||
return getattr(sys.modules[mod_str], class_str)
|
||||
except (ValueError, AttributeError):
|
||||
except AttributeError:
|
||||
raise ImportError('Class %s cannot be found (%s)' %
|
||||
(class_str,
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
@ -58,6 +58,13 @@ def import_module(import_str):
|
||||
return sys.modules[import_str]
|
||||
|
||||
|
||||
def import_versioned_module(version, submodule=None):
|
||||
module = 'cloudbaseinit.v%s' % version
|
||||
if submodule:
|
||||
module = '.'.join((module, submodule))
|
||||
return import_module(module)
|
||||
|
||||
|
||||
def try_import(import_str, default=None):
|
||||
"""Try to import a module and if it fails return default."""
|
||||
try:
|
||||
|
@ -31,25 +31,37 @@ This module provides a few things:
|
||||
'''
|
||||
|
||||
|
||||
import codecs
|
||||
import datetime
|
||||
import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import json
|
||||
try:
|
||||
import xmlrpclib
|
||||
except ImportError:
|
||||
# NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3
|
||||
# however the function and object call signatures
|
||||
# remained the same. This whole try/except block should
|
||||
# be removed and replaced with a call to six.moves once
|
||||
# six 1.4.2 is released. See http://bit.ly/1bqrVzu
|
||||
import xmlrpc.client as xmlrpclib
|
||||
import sys
|
||||
|
||||
is_simplejson = False
|
||||
if sys.version_info < (2, 7):
|
||||
# On Python <= 2.6, json module is not C boosted, so try to use
|
||||
# simplejson module if available
|
||||
try:
|
||||
import simplejson as json
|
||||
# NOTE(mriedem): Make sure we have a new enough version of simplejson
|
||||
# to support the namedobject_as_tuple argument. This can be removed
|
||||
# in the Kilo release when python 2.6 support is dropped.
|
||||
if 'namedtuple_as_object' in inspect.getargspec(json.dumps).args:
|
||||
is_simplejson = True
|
||||
else:
|
||||
import json
|
||||
except ImportError:
|
||||
import json
|
||||
else:
|
||||
import json
|
||||
|
||||
import six
|
||||
import six.moves.xmlrpc_client as xmlrpclib
|
||||
|
||||
from cloudbaseinit.openstack.common import gettextutils
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import strutils
|
||||
from cloudbaseinit.openstack.common import timeutils
|
||||
|
||||
netaddr = importutils.try_import("netaddr")
|
||||
@ -161,15 +173,23 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
|
||||
|
||||
def dumps(value, default=to_primitive, **kwargs):
|
||||
if is_simplejson:
|
||||
kwargs['namedtuple_as_object'] = False
|
||||
return json.dumps(value, default=default, **kwargs)
|
||||
|
||||
|
||||
def loads(s):
|
||||
return json.loads(s)
|
||||
def dump(obj, fp, *args, **kwargs):
|
||||
if is_simplejson:
|
||||
kwargs['namedtuple_as_object'] = False
|
||||
return json.dump(obj, fp, *args, **kwargs)
|
||||
|
||||
|
||||
def load(s):
|
||||
return json.load(s)
|
||||
def loads(s, encoding='utf-8', **kwargs):
|
||||
return json.loads(strutils.safe_decode(s, encoding), **kwargs)
|
||||
|
||||
|
||||
def load(fp, encoding='utf-8', **kwargs):
|
||||
return json.load(codecs.getreader(encoding)(fp), **kwargs)
|
||||
|
||||
|
||||
try:
|
||||
|
@ -15,7 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Openstack logging handler.
|
||||
"""OpenStack logging handler.
|
||||
|
||||
This module adds to logging functionality by adding the option to specify
|
||||
a context object when calling the various log methods. If the context object
|
||||
@ -33,7 +33,7 @@ import logging
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
@ -41,31 +41,19 @@ from oslo.config import cfg
|
||||
import six
|
||||
from six import moves
|
||||
|
||||
_PY26 = sys.version_info[0:2] == (2, 6)
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import jsonutils
|
||||
from cloudbaseinit.openstack.common import local
|
||||
# NOTE(flaper87): Pls, remove when graduating this module
|
||||
# from the incubator.
|
||||
from cloudbaseinit.openstack.common.strutils import mask_password # noqa
|
||||
|
||||
|
||||
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
|
||||
|
||||
# NOTE(ldbragst): Let's build a list of regex objects using the list of
|
||||
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
|
||||
# to the list of _SANITIZE_KEYS and we can generate regular expressions
|
||||
# for XML and JSON automatically.
|
||||
_SANITIZE_PATTERNS = []
|
||||
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
|
||||
r'(<%(key)s>).*?(</%(key)s>)',
|
||||
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
|
||||
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
|
||||
|
||||
for key in _SANITIZE_KEYS:
|
||||
for pattern in _FORMAT_PATTERNS:
|
||||
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
|
||||
_SANITIZE_PATTERNS.append(reg_ex)
|
||||
|
||||
|
||||
common_cli_opts = [
|
||||
cfg.BoolOpt('debug',
|
||||
@ -84,14 +72,11 @@ logging_cli_opts = [
|
||||
cfg.StrOpt('log-config-append',
|
||||
metavar='PATH',
|
||||
deprecated_name='log-config',
|
||||
help='The name of logging configuration file. It does not '
|
||||
'disable existing loggers, but just appends specified '
|
||||
'logging configuration to any other existing logging '
|
||||
'options. Please see the Python logging module '
|
||||
'documentation for details on logging configuration '
|
||||
'files.'),
|
||||
help='The name of a logging configuration file. This file '
|
||||
'is appended to any existing logging configuration '
|
||||
'files. For details about logging configuration files, '
|
||||
'see the Python logging module documentation.'),
|
||||
cfg.StrOpt('log-format',
|
||||
default=None,
|
||||
metavar='FORMAT',
|
||||
help='DEPRECATED. '
|
||||
'A logging.Formatter log message format string which may '
|
||||
@ -103,7 +88,7 @@ logging_cli_opts = [
|
||||
default=_DEFAULT_LOG_DATE_FORMAT,
|
||||
metavar='DATE_FORMAT',
|
||||
help='Format string for %%(asctime)s in log records. '
|
||||
'Default: %(default)s'),
|
||||
'Default: %(default)s .'),
|
||||
cfg.StrOpt('log-file',
|
||||
metavar='PATH',
|
||||
deprecated_name='logfile',
|
||||
@ -112,79 +97,78 @@ logging_cli_opts = [
|
||||
cfg.StrOpt('log-dir',
|
||||
deprecated_name='logdir',
|
||||
help='(Optional) The base directory used for relative '
|
||||
'--log-file paths'),
|
||||
'--log-file paths.'),
|
||||
cfg.BoolOpt('use-syslog',
|
||||
default=False,
|
||||
help='Use syslog for logging. '
|
||||
'Existing syslog format is DEPRECATED during I, '
|
||||
'and then will be changed in J to honor RFC5424'),
|
||||
'and will change in J to honor RFC5424.'),
|
||||
cfg.BoolOpt('use-syslog-rfc-format',
|
||||
# TODO(bogdando) remove or use True after existing
|
||||
# syslog format deprecation in J
|
||||
default=False,
|
||||
help='(Optional) Use syslog rfc5424 format for logging. '
|
||||
'If enabled, will add APP-NAME (RFC5424) before the '
|
||||
'MSG part of the syslog message. The old format '
|
||||
'without APP-NAME is deprecated in I, '
|
||||
help='(Optional) Enables or disables syslog rfc5424 format '
|
||||
'for logging. If enabled, prefixes the MSG part of the '
|
||||
'syslog message with APP-NAME (RFC5424). The '
|
||||
'format without the APP-NAME is deprecated in I, '
|
||||
'and will be removed in J.'),
|
||||
cfg.StrOpt('syslog-log-facility',
|
||||
default='LOG_USER',
|
||||
help='Syslog facility to receive log lines')
|
||||
help='Syslog facility to receive log lines.')
|
||||
]
|
||||
|
||||
generic_log_opts = [
|
||||
cfg.BoolOpt('use_stderr',
|
||||
default=True,
|
||||
help='Log output to standard error')
|
||||
help='Log output to standard error.')
|
||||
]
|
||||
|
||||
DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'boto=WARN',
|
||||
'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO',
|
||||
'oslo.messaging=INFO', 'iso8601=WARN',
|
||||
'requests.packages.urllib3.connectionpool=WARN',
|
||||
'urllib3.connectionpool=WARN', 'websocket=WARN',
|
||||
"keystonemiddleware=WARN", "routes.middleware=WARN",
|
||||
"stevedore=WARN"]
|
||||
|
||||
log_opts = [
|
||||
cfg.StrOpt('logging_context_format_string',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||
'%(name)s [%(request_id)s %(user_identity)s] '
|
||||
'%(instance)s%(message)s',
|
||||
help='Format string to use for log messages with context'),
|
||||
help='Format string to use for log messages with context.'),
|
||||
cfg.StrOpt('logging_default_format_string',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||
'%(name)s [-] %(instance)s%(message)s',
|
||||
help='Format string to use for log messages without context'),
|
||||
help='Format string to use for log messages without context.'),
|
||||
cfg.StrOpt('logging_debug_format_suffix',
|
||||
default='%(funcName)s %(pathname)s:%(lineno)d',
|
||||
help='Data to append to log format when level is DEBUG'),
|
||||
help='Data to append to log format when level is DEBUG.'),
|
||||
cfg.StrOpt('logging_exception_prefix',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
|
||||
'%(instance)s',
|
||||
help='Prefix each line of exception output with this format'),
|
||||
help='Prefix each line of exception output with this format.'),
|
||||
cfg.ListOpt('default_log_levels',
|
||||
default=[
|
||||
'amqp=WARN',
|
||||
'amqplib=WARN',
|
||||
'boto=WARN',
|
||||
'qpid=WARN',
|
||||
'sqlalchemy=WARN',
|
||||
'suds=INFO',
|
||||
'iso8601=WARN',
|
||||
'requests.packages.urllib3.connectionpool=WARN'
|
||||
],
|
||||
help='List of logger=LEVEL pairs'),
|
||||
default=DEFAULT_LOG_LEVELS,
|
||||
help='List of logger=LEVEL pairs.'),
|
||||
cfg.BoolOpt('publish_errors',
|
||||
default=False,
|
||||
help='Publish error events'),
|
||||
help='Enables or disables publication of error events.'),
|
||||
cfg.BoolOpt('fatal_deprecations',
|
||||
default=False,
|
||||
help='Make deprecations fatal'),
|
||||
help='Enables or disables fatal status of deprecations.'),
|
||||
|
||||
# NOTE(mikal): there are two options here because sometimes we are handed
|
||||
# a full instance (and could include more information), and other times we
|
||||
# are just handed a UUID for the instance.
|
||||
cfg.StrOpt('instance_format',
|
||||
default='[instance: %(uuid)s] ',
|
||||
help='If an instance is passed with the log message, format '
|
||||
'it like this'),
|
||||
help='The format for an instance that is passed with the log '
|
||||
'message.'),
|
||||
cfg.StrOpt('instance_uuid_format',
|
||||
default='[instance: %(uuid)s] ',
|
||||
help='If an instance UUID is passed with the log message, '
|
||||
'format it like this'),
|
||||
help='The format for an instance UUID that is passed with the '
|
||||
'log message.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -243,45 +227,20 @@ def _get_log_file_path(binary=None):
|
||||
return None
|
||||
|
||||
|
||||
def mask_password(message, secret="***"):
|
||||
"""Replace password with 'secret' in message.
|
||||
|
||||
:param message: The string which includes security information.
|
||||
:param secret: value with which to replace passwords.
|
||||
:returns: The unicode value of message with the password fields masked.
|
||||
|
||||
For example:
|
||||
|
||||
>>> mask_password("'adminPass' : 'aaaaa'")
|
||||
"'adminPass' : '***'"
|
||||
>>> mask_password("'admin_pass' : 'aaaaa'")
|
||||
"'admin_pass' : '***'"
|
||||
>>> mask_password('"password" : "aaaaa"')
|
||||
'"password" : "***"'
|
||||
>>> mask_password("'original_password' : 'aaaaa'")
|
||||
"'original_password' : '***'"
|
||||
>>> mask_password("u'original_password' : u'aaaaa'")
|
||||
"u'original_password' : u'***'"
|
||||
"""
|
||||
message = six.text_type(message)
|
||||
|
||||
# NOTE(ldbragst): Check to see if anything in message contains any key
|
||||
# specified in _SANITIZE_KEYS, if not then just return the message since
|
||||
# we don't have to mask any passwords.
|
||||
if not any(key in message for key in _SANITIZE_KEYS):
|
||||
return message
|
||||
|
||||
secret = r'\g<1>' + secret + r'\g<2>'
|
||||
for pattern in _SANITIZE_PATTERNS:
|
||||
message = re.sub(pattern, secret, message)
|
||||
return message
|
||||
|
||||
|
||||
class BaseLoggerAdapter(logging.LoggerAdapter):
|
||||
|
||||
def audit(self, msg, *args, **kwargs):
|
||||
self.log(logging.AUDIT, msg, *args, **kwargs)
|
||||
|
||||
def isEnabledFor(self, level):
|
||||
if _PY26:
|
||||
# This method was added in python 2.7 (and it does the exact
|
||||
# same logic, so we need to do the exact same logic so that
|
||||
# python 2.6 has this capability as well).
|
||||
return self.logger.isEnabledFor(level)
|
||||
else:
|
||||
return super(BaseLoggerAdapter, self).isEnabledFor(level)
|
||||
|
||||
|
||||
class LazyAdapter(BaseLoggerAdapter):
|
||||
def __init__(self, name='unknown', version='unknown'):
|
||||
@ -294,6 +253,11 @@ class LazyAdapter(BaseLoggerAdapter):
|
||||
def logger(self):
|
||||
if not self._logger:
|
||||
self._logger = getLogger(self.name, self.version)
|
||||
if six.PY3:
|
||||
# In Python 3, the code fails because the 'manager' attribute
|
||||
# cannot be found when using a LoggerAdapter as the
|
||||
# underlying logger. Work around this issue.
|
||||
self._logger.manager = self._logger.logger.manager
|
||||
return self._logger
|
||||
|
||||
|
||||
@ -304,25 +268,45 @@ class ContextAdapter(BaseLoggerAdapter):
|
||||
self.logger = logger
|
||||
self.project = project_name
|
||||
self.version = version_string
|
||||
self._deprecated_messages_sent = dict()
|
||||
|
||||
@property
|
||||
def handlers(self):
|
||||
return self.logger.handlers
|
||||
|
||||
def deprecated(self, msg, *args, **kwargs):
|
||||
"""Call this method when a deprecated feature is used.
|
||||
|
||||
If the system is configured for fatal deprecations then the message
|
||||
is logged at the 'critical' level and :class:`DeprecatedConfig` will
|
||||
be raised.
|
||||
|
||||
Otherwise, the message will be logged (once) at the 'warn' level.
|
||||
|
||||
:raises: :class:`DeprecatedConfig` if the system is configured for
|
||||
fatal deprecations.
|
||||
|
||||
"""
|
||||
stdmsg = _("Deprecated: %s") % msg
|
||||
if CONF.fatal_deprecations:
|
||||
self.critical(stdmsg, *args, **kwargs)
|
||||
raise DeprecatedConfig(msg=stdmsg)
|
||||
else:
|
||||
self.warn(stdmsg, *args, **kwargs)
|
||||
|
||||
# Using a list because a tuple with dict can't be stored in a set.
|
||||
sent_args = self._deprecated_messages_sent.setdefault(msg, list())
|
||||
|
||||
if args in sent_args:
|
||||
# Already logged this message, so don't log it again.
|
||||
return
|
||||
|
||||
sent_args.append(args)
|
||||
self.warn(stdmsg, *args, **kwargs)
|
||||
|
||||
def process(self, msg, kwargs):
|
||||
# NOTE(mrodden): catch any Message/other object and
|
||||
# coerce to unicode before they can get
|
||||
# to the python logging and possibly
|
||||
# cause string encoding trouble
|
||||
if not isinstance(msg, six.string_types):
|
||||
# NOTE(jecarey): If msg is not unicode, coerce it into unicode
|
||||
# before it can get to the python logging and
|
||||
# possibly cause string encoding trouble
|
||||
if not isinstance(msg, six.text_type):
|
||||
msg = six.text_type(msg)
|
||||
|
||||
if 'extra' not in kwargs:
|
||||
@ -336,7 +320,7 @@ class ContextAdapter(BaseLoggerAdapter):
|
||||
extra.update(_dictify_context(context))
|
||||
|
||||
instance = kwargs.pop('instance', None)
|
||||
instance_uuid = (extra.get('instance_uuid', None) or
|
||||
instance_uuid = (extra.get('instance_uuid') or
|
||||
kwargs.pop('instance_uuid', None))
|
||||
instance_extra = ''
|
||||
if instance:
|
||||
@ -402,9 +386,7 @@ class JSONFormatter(logging.Formatter):
|
||||
|
||||
def _create_logging_excepthook(product_name):
|
||||
def logging_excepthook(exc_type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose or CONF.debug:
|
||||
extra['exc_info'] = (exc_type, value, tb)
|
||||
extra = {'exc_info': (exc_type, value, tb)}
|
||||
getLogger(product_name).critical(
|
||||
"".join(traceback.format_exception_only(exc_type, value)),
|
||||
**extra)
|
||||
@ -428,8 +410,8 @@ def _load_log_config(log_config_append):
|
||||
try:
|
||||
logging.config.fileConfig(log_config_append,
|
||||
disable_existing_loggers=False)
|
||||
except moves.configparser.Error as exc:
|
||||
raise LogConfigError(log_config_append, str(exc))
|
||||
except (moves.configparser.Error, KeyError) as exc:
|
||||
raise LogConfigError(log_config_append, six.text_type(exc))
|
||||
|
||||
|
||||
def setup(product_name, version='unknown'):
|
||||
@ -441,10 +423,20 @@ def setup(product_name, version='unknown'):
|
||||
sys.excepthook = _create_logging_excepthook(product_name)
|
||||
|
||||
|
||||
def set_defaults(logging_context_format_string):
|
||||
cfg.set_defaults(log_opts,
|
||||
logging_context_format_string=
|
||||
logging_context_format_string)
|
||||
def set_defaults(logging_context_format_string=None,
|
||||
default_log_levels=None):
|
||||
# Just in case the caller is not setting the
|
||||
# default_log_level. This is insurance because
|
||||
# we introduced the default_log_level parameter
|
||||
# later in a backwards in-compatible change
|
||||
if default_log_levels is not None:
|
||||
cfg.set_defaults(
|
||||
log_opts,
|
||||
default_log_levels=default_log_levels)
|
||||
if logging_context_format_string is not None:
|
||||
cfg.set_defaults(
|
||||
log_opts,
|
||||
logging_context_format_string=logging_context_format_string)
|
||||
|
||||
|
||||
def _find_facility_from_conf():
|
||||
@ -474,10 +466,16 @@ def _find_facility_from_conf():
|
||||
class RFCSysLogHandler(logging.handlers.SysLogHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.binary_name = _get_binary_name()
|
||||
super(RFCSysLogHandler, self).__init__(*args, **kwargs)
|
||||
# Do not use super() unless type(logging.handlers.SysLogHandler)
|
||||
# is 'type' (Python 2.7).
|
||||
# Use old style calls, if the type is 'classobj' (Python 2.6)
|
||||
logging.handlers.SysLogHandler.__init__(self, *args, **kwargs)
|
||||
|
||||
def format(self, record):
|
||||
msg = super(RFCSysLogHandler, self).format(record)
|
||||
# Do not use super() unless type(logging.handlers.SysLogHandler)
|
||||
# is 'type' (Python 2.7).
|
||||
# Use old style calls, if the type is 'classobj' (Python 2.6)
|
||||
msg = logging.handlers.SysLogHandler.format(self, record)
|
||||
msg = self.binary_name + ' ' + msg
|
||||
return msg
|
||||
|
||||
@ -487,18 +485,6 @@ def _setup_logging_from_conf(project, version):
|
||||
for handler in log_root.handlers:
|
||||
log_root.removeHandler(handler)
|
||||
|
||||
if CONF.use_syslog:
|
||||
facility = _find_facility_from_conf()
|
||||
# TODO(bogdando) use the format provided by RFCSysLogHandler
|
||||
# after existing syslog format deprecation in J
|
||||
if CONF.use_syslog_rfc_format:
|
||||
syslog = RFCSysLogHandler(address='/dev/log',
|
||||
facility=facility)
|
||||
else:
|
||||
syslog = logging.handlers.SysLogHandler(address='/dev/log',
|
||||
facility=facility)
|
||||
log_root.addHandler(syslog)
|
||||
|
||||
logpath = _get_log_file_path()
|
||||
if logpath:
|
||||
filelog = logging.handlers.WatchedFileHandler(logpath)
|
||||
@ -515,9 +501,14 @@ def _setup_logging_from_conf(project, version):
|
||||
log_root.addHandler(streamlog)
|
||||
|
||||
if CONF.publish_errors:
|
||||
handler = importutils.import_object(
|
||||
"cloudbaseinit.openstack.common.log_handler.PublishErrorsHandler",
|
||||
logging.ERROR)
|
||||
try:
|
||||
handler = importutils.import_object(
|
||||
"cloudbaseinit.openstack.common.log_handler.PublishErrorsHandler",
|
||||
logging.ERROR)
|
||||
except ImportError:
|
||||
handler = importutils.import_object(
|
||||
"oslo.messaging.notify.log_handler.PublishErrorsHandler",
|
||||
logging.ERROR)
|
||||
log_root.addHandler(handler)
|
||||
|
||||
datefmt = CONF.log_date_format
|
||||
@ -543,9 +534,29 @@ def _setup_logging_from_conf(project, version):
|
||||
|
||||
for pair in CONF.default_log_levels:
|
||||
mod, _sep, level_name = pair.partition('=')
|
||||
level = logging.getLevelName(level_name)
|
||||
logger = logging.getLogger(mod)
|
||||
logger.setLevel(level)
|
||||
# NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name
|
||||
# to integer code.
|
||||
if sys.version_info < (2, 7):
|
||||
level = logging.getLevelName(level_name)
|
||||
logger.setLevel(level)
|
||||
else:
|
||||
logger.setLevel(level_name)
|
||||
|
||||
if CONF.use_syslog:
|
||||
try:
|
||||
facility = _find_facility_from_conf()
|
||||
# TODO(bogdando) use the format provided by RFCSysLogHandler
|
||||
# after existing syslog format deprecation in J
|
||||
if CONF.use_syslog_rfc_format:
|
||||
syslog = RFCSysLogHandler(facility=facility)
|
||||
else:
|
||||
syslog = logging.handlers.SysLogHandler(facility=facility)
|
||||
log_root.addHandler(syslog)
|
||||
except socket.error:
|
||||
log_root.error('Unable to add syslog handler. Verify that syslog'
|
||||
'is running.')
|
||||
|
||||
|
||||
_loggers = {}
|
||||
|
||||
@ -615,6 +626,12 @@ class ContextFormatter(logging.Formatter):
|
||||
def format(self, record):
|
||||
"""Uses contextstring if request_id is set, otherwise default."""
|
||||
|
||||
# NOTE(jecarey): If msg is not unicode, coerce it into unicode
|
||||
# before it can get to the python logging and
|
||||
# possibly cause string encoding trouble
|
||||
if not isinstance(record.msg, six.text_type):
|
||||
record.msg = six.text_type(record.msg)
|
||||
|
||||
# store project info
|
||||
record.project = self.project
|
||||
record.version = self.version
|
||||
@ -629,19 +646,24 @@ class ContextFormatter(logging.Formatter):
|
||||
# NOTE(sdague): default the fancier formatting params
|
||||
# to an empty string so we don't throw an exception if
|
||||
# they get used
|
||||
for key in ('instance', 'color'):
|
||||
for key in ('instance', 'color', 'user_identity'):
|
||||
if key not in record.__dict__:
|
||||
record.__dict__[key] = ''
|
||||
|
||||
if record.__dict__.get('request_id', None):
|
||||
self._fmt = CONF.logging_context_format_string
|
||||
if record.__dict__.get('request_id'):
|
||||
fmt = CONF.logging_context_format_string
|
||||
else:
|
||||
self._fmt = CONF.logging_default_format_string
|
||||
fmt = CONF.logging_default_format_string
|
||||
|
||||
if (record.levelno == logging.DEBUG and
|
||||
CONF.logging_debug_format_suffix):
|
||||
self._fmt += " " + CONF.logging_debug_format_suffix
|
||||
fmt += " " + CONF.logging_debug_format_suffix
|
||||
|
||||
if sys.version_info < (3, 2):
|
||||
self._fmt = fmt
|
||||
else:
|
||||
self._style = logging.PercentStyle(fmt)
|
||||
self._fmt = self._style._fmt
|
||||
# Cache this on the record, Logger will respect our formatted copy
|
||||
if record.exc_info:
|
||||
record.exc_text = self.formatException(record.exc_info, record)
|
||||
|
@ -16,31 +16,36 @@
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LE, _LW
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common import timeutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# NOTE(zyluo): This lambda function was declared to avoid mocking collisions
|
||||
# with time.time() called in the standard logging module
|
||||
# during unittests.
|
||||
_ts = lambda: time.time()
|
||||
|
||||
|
||||
class LoopingCallDone(Exception):
|
||||
"""Exception to break out and stop a LoopingCall.
|
||||
"""Exception to break out and stop a LoopingCallBase.
|
||||
|
||||
The poll-function passed to LoopingCall can raise this exception to
|
||||
The poll-function passed to LoopingCallBase can raise this exception to
|
||||
break out of the loop normally. This is somewhat analogous to
|
||||
StopIteration.
|
||||
|
||||
An optional return-value can be included as the argument to the exception;
|
||||
this return-value will be returned by LoopingCall.wait()
|
||||
this return-value will be returned by LoopingCallBase.wait()
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, retvalue=True):
|
||||
""":param retvalue: Value that LoopingCall.wait() should return."""
|
||||
""":param retvalue: Value that LoopingCallBase.wait() should return."""
|
||||
self.retvalue = retvalue
|
||||
|
||||
|
||||
@ -72,21 +77,22 @@ class FixedIntervalLoopingCall(LoopingCallBase):
|
||||
|
||||
try:
|
||||
while self._running:
|
||||
start = timeutils.utcnow()
|
||||
start = _ts()
|
||||
self.f(*self.args, **self.kw)
|
||||
end = timeutils.utcnow()
|
||||
end = _ts()
|
||||
if not self._running:
|
||||
break
|
||||
delay = interval - timeutils.delta_seconds(start, end)
|
||||
if delay <= 0:
|
||||
LOG.warn(_('task run outlasted interval by %s sec') %
|
||||
-delay)
|
||||
greenthread.sleep(delay if delay > 0 else 0)
|
||||
delay = end - start - interval
|
||||
if delay > 0:
|
||||
LOG.warn(_LW('task %(func_name)s run outlasted '
|
||||
'interval by %(delay).2f sec'),
|
||||
{'func_name': repr(self.f), 'delay': delay})
|
||||
greenthread.sleep(-delay if delay < 0 else 0)
|
||||
except LoopingCallDone as e:
|
||||
self.stop()
|
||||
done.send(e.retvalue)
|
||||
except Exception:
|
||||
LOG.exception(_('in fixed duration looping call'))
|
||||
LOG.exception(_LE('in fixed duration looping call'))
|
||||
done.send_exception(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
@ -98,11 +104,6 @@ class FixedIntervalLoopingCall(LoopingCallBase):
|
||||
return self.done
|
||||
|
||||
|
||||
# TODO(mikal): this class name is deprecated in Havana and should be removed
|
||||
# in the I release
|
||||
LoopingCall = FixedIntervalLoopingCall
|
||||
|
||||
|
||||
class DynamicLoopingCall(LoopingCallBase):
|
||||
"""A looping call which sleeps until the next known event.
|
||||
|
||||
@ -126,14 +127,15 @@ class DynamicLoopingCall(LoopingCallBase):
|
||||
|
||||
if periodic_interval_max is not None:
|
||||
idle = min(idle, periodic_interval_max)
|
||||
LOG.debug(_('Dynamic looping call sleeping for %.02f '
|
||||
'seconds'), idle)
|
||||
LOG.debug('Dynamic looping call %(func_name)s sleeping '
|
||||
'for %(idle).02f seconds',
|
||||
{'func_name': repr(self.f), 'idle': idle})
|
||||
greenthread.sleep(idle)
|
||||
except LoopingCallDone as e:
|
||||
self.stop()
|
||||
done.send(e.retvalue)
|
||||
except Exception:
|
||||
LOG.exception(_('in dynamic looping call'))
|
||||
LOG.exception(_LE('in dynamic looping call'))
|
||||
done.send_exception(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
|
@ -17,7 +17,14 @@
|
||||
Network-related utilities and helper functions.
|
||||
"""
|
||||
|
||||
from cloudbaseinit.openstack.common.py3kcompat import urlutils
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from six.moves.urllib import parse
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_host_port(address, default_port=None):
|
||||
@ -42,8 +49,12 @@ def parse_host_port(address, default_port=None):
|
||||
('::1', 1234)
|
||||
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
|
||||
('2001:db8:85a3::8a2e:370:7334', 1234)
|
||||
|
||||
>>> parse_host_port(None)
|
||||
(None, None)
|
||||
"""
|
||||
if not address:
|
||||
return (None, None)
|
||||
|
||||
if address[0] == '[':
|
||||
# Escaped ipv6
|
||||
_host, _port = address[1:].split(']')
|
||||
@ -64,16 +75,89 @@ def parse_host_port(address, default_port=None):
|
||||
return (host, None if port is None else int(port))
|
||||
|
||||
|
||||
class ModifiedSplitResult(parse.SplitResult):
|
||||
"""Split results class for urlsplit."""
|
||||
|
||||
# NOTE(dims): The functions below are needed for Python 2.6.x.
|
||||
# We can remove these when we drop support for 2.6.x.
|
||||
@property
|
||||
def hostname(self):
|
||||
netloc = self.netloc.split('@', 1)[-1]
|
||||
host, port = parse_host_port(netloc)
|
||||
return host
|
||||
|
||||
@property
|
||||
def port(self):
|
||||
netloc = self.netloc.split('@', 1)[-1]
|
||||
host, port = parse_host_port(netloc)
|
||||
return port
|
||||
|
||||
|
||||
def urlsplit(url, scheme='', allow_fragments=True):
|
||||
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
|
||||
This function papers over Python issue9374 when needed.
|
||||
|
||||
The parameters are the same as urlparse.urlsplit.
|
||||
"""
|
||||
scheme, netloc, path, query, fragment = urlutils.urlsplit(
|
||||
scheme, netloc, path, query, fragment = parse.urlsplit(
|
||||
url, scheme, allow_fragments)
|
||||
if allow_fragments and '#' in path:
|
||||
path, fragment = path.split('#', 1)
|
||||
if '?' in path:
|
||||
path, query = path.split('?', 1)
|
||||
return urlutils.SplitResult(scheme, netloc, path, query, fragment)
|
||||
return ModifiedSplitResult(scheme, netloc,
|
||||
path, query, fragment)
|
||||
|
||||
|
||||
def set_tcp_keepalive(sock, tcp_keepalive=True,
|
||||
tcp_keepidle=None,
|
||||
tcp_keepalive_interval=None,
|
||||
tcp_keepalive_count=None):
|
||||
"""Set values for tcp keepalive parameters
|
||||
|
||||
This function configures tcp keepalive parameters if users wish to do
|
||||
so.
|
||||
|
||||
:param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are
|
||||
not sure, this should be True, and default values will be used.
|
||||
|
||||
:param tcp_keepidle: time to wait before starting to send keepalive probes
|
||||
:param tcp_keepalive_interval: time between successive probes, once the
|
||||
initial wait time is over
|
||||
:param tcp_keepalive_count: number of probes to send before the connection
|
||||
is killed
|
||||
"""
|
||||
|
||||
# NOTE(praneshp): Despite keepalive being a tcp concept, the level is
|
||||
# still SOL_SOCKET. This is a quirk.
|
||||
if isinstance(tcp_keepalive, bool):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive)
|
||||
else:
|
||||
raise TypeError("tcp_keepalive must be a boolean")
|
||||
|
||||
if not tcp_keepalive:
|
||||
return
|
||||
|
||||
# These options aren't available in the OS X version of eventlet,
|
||||
# Idle + Count * Interval effectively gives you the total timeout.
|
||||
if tcp_keepidle is not None:
|
||||
if hasattr(socket, 'TCP_KEEPIDLE'):
|
||||
sock.setsockopt(socket.IPPROTO_TCP,
|
||||
socket.TCP_KEEPIDLE,
|
||||
tcp_keepidle)
|
||||
else:
|
||||
LOG.warning(_LW('tcp_keepidle not available on your system'))
|
||||
if tcp_keepalive_interval is not None:
|
||||
if hasattr(socket, 'TCP_KEEPINTVL'):
|
||||
sock.setsockopt(socket.IPPROTO_TCP,
|
||||
socket.TCP_KEEPINTVL,
|
||||
tcp_keepalive_interval)
|
||||
else:
|
||||
LOG.warning(_LW('tcp_keepintvl not available on your system'))
|
||||
if tcp_keepalive_count is not None:
|
||||
if hasattr(socket, 'TCP_KEEPCNT'):
|
||||
sock.setsockopt(socket.IPPROTO_TCP,
|
||||
socket.TCP_KEEPCNT,
|
||||
tcp_keepalive_count)
|
||||
else:
|
||||
LOG.warning(_LW('tcp_keepknt not available on your system'))
|
||||
|
@ -19,7 +19,7 @@ import uuid
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common import context
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import jsonutils
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
@ -36,7 +36,6 @@ notifier_opts = [
|
||||
default='INFO',
|
||||
help='Default notification level for outgoing notifications'),
|
||||
cfg.StrOpt('default_publisher_id',
|
||||
default=None,
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
|
||||
@ -142,9 +141,9 @@ def notify(context, publisher_id, event_type, priority, payload):
|
||||
try:
|
||||
driver.notify(context, msg)
|
||||
except Exception as e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. "
|
||||
"Payload=%(payload)s")
|
||||
LOG.exception(_LE("Problem '%(e)s' attempting to "
|
||||
"send to notification system. "
|
||||
"Payload=%(payload)s")
|
||||
% dict(e=e, payload=payload))
|
||||
|
||||
|
||||
@ -161,8 +160,8 @@ def _get_drivers():
|
||||
driver = importutils.import_module(notification_driver)
|
||||
_drivers[notification_driver] = driver
|
||||
except ImportError:
|
||||
LOG.exception(_("Failed to load notifier %s. "
|
||||
"These notifications will not be sent.") %
|
||||
LOG.exception(_LE("Failed to load notifier %s. "
|
||||
"These notifications will not be sent.") %
|
||||
notification_driver)
|
||||
return _drivers.values()
|
||||
|
||||
|
@ -13,11 +13,10 @@
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A temporary helper which emulates cloudbaseinit.messaging.Notifier.
|
||||
A temporary helper which emulates oslo.messaging.Notifier.
|
||||
|
||||
This helper method allows us to do the tedious porting to the new Notifier API
|
||||
as a standalone commit so that the commit which switches us to
|
||||
cloudbaseinit.messaging
|
||||
as a standalone commit so that the commit which switches us to oslo.messaging
|
||||
is smaller and easier to review. This file will be removed as part of that
|
||||
commit.
|
||||
"""
|
||||
|
@ -16,7 +16,7 @@
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common import context as req_context
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LE
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common import rpc
|
||||
|
||||
@ -42,6 +42,6 @@ def notify(context, message):
|
||||
try:
|
||||
rpc.notify(context, topic, message)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"),
|
||||
LOG.exception(_LE("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"),
|
||||
{"topic": topic, "message": message})
|
||||
|
@ -18,7 +18,7 @@
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common import context as req_context
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LE
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common import rpc
|
||||
|
||||
@ -48,6 +48,6 @@ def notify(context, message):
|
||||
try:
|
||||
rpc.notify(context, topic, message, envelope=True)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"),
|
||||
LOG.exception(_LE("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"),
|
||||
{"topic": topic, "message": message})
|
||||
|
@ -37,7 +37,7 @@ import six
|
||||
|
||||
|
||||
from cloudbaseinit.openstack.common import excutils
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE
|
||||
from cloudbaseinit.openstack.common import local
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common.rpc import common as rpc_common
|
||||
@ -72,7 +72,7 @@ class Pool(pools.Pool):
|
||||
|
||||
# TODO(comstud): Timeout connections not used in a while
|
||||
def create(self):
|
||||
LOG.debug(_('Pool creating new connection'))
|
||||
LOG.debug('Pool creating new connection')
|
||||
return self.connection_cls(self.conf)
|
||||
|
||||
def empty(self):
|
||||
@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection):
|
||||
ack_on_error)
|
||||
|
||||
def consume_in_thread(self):
|
||||
self.connection.consume_in_thread()
|
||||
return self.connection.consume_in_thread()
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Proxy all other calls to the Connection instance."""
|
||||
@ -202,7 +202,7 @@ class ReplyProxy(ConnectionContext):
|
||||
LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
|
||||
', message : %(data)s'), {'msg_id': msg_id,
|
||||
'data': message_data})
|
||||
LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
|
||||
LOG.warn(_('_call_waiters: %s') % self._call_waiters)
|
||||
else:
|
||||
waiter.put(message_data)
|
||||
|
||||
@ -287,7 +287,7 @@ def unpack_context(conf, msg):
|
||||
context_dict['reply_q'] = msg.pop('_reply_q', None)
|
||||
context_dict['conf'] = conf
|
||||
ctx = RpcContext.from_dict(context_dict)
|
||||
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
|
||||
rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
|
||||
return ctx
|
||||
|
||||
|
||||
@ -339,7 +339,7 @@ def _add_unique_id(msg):
|
||||
"""Add unique_id for checking duplicate messages."""
|
||||
unique_id = uuid.uuid4().hex
|
||||
msg.update({UNIQUE_ID: unique_id})
|
||||
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
|
||||
LOG.debug('UNIQUE_ID is %s.' % (unique_id))
|
||||
|
||||
|
||||
class _ThreadPoolWithWait(object):
|
||||
@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
||||
# the previous context is stored in local.store.context
|
||||
if hasattr(local.store, 'context'):
|
||||
del local.store.context
|
||||
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
||||
rpc_common._safe_log(LOG.debug, 'received %s', message_data)
|
||||
self.msg_id_cache.check_duplicate_message(message_data)
|
||||
ctxt = unpack_context(self.conf, message_data)
|
||||
method = message_data.get('method')
|
||||
@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
||||
# This final None tells multicall that it is done.
|
||||
ctxt.reply(ending=True, connection_pool=self.connection_pool)
|
||||
except rpc_common.ClientException as e:
|
||||
LOG.debug(_('Expected exception during message handling (%s)') %
|
||||
LOG.debug('Expected exception during message handling (%s)' %
|
||||
e._exc_info[1])
|
||||
ctxt.reply(None, e._exc_info,
|
||||
connection_pool=self.connection_pool,
|
||||
@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
||||
except Exception:
|
||||
# sys.exc_info() is deleted by LOG.exception().
|
||||
exc_info = sys.exc_info()
|
||||
LOG.error(_('Exception during message handling'),
|
||||
LOG.error(_LE('Exception during message handling'),
|
||||
exc_info=exc_info)
|
||||
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
|
||||
|
||||
@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore()
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||
"""Make a call that returns multiple times."""
|
||||
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
||||
LOG.debug('Making synchronous call on %s ...', topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
LOG.debug('MSG_ID is %s' % (msg_id))
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
|
||||
@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
|
||||
|
||||
def cast(conf, context, topic, msg, connection_pool):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||
LOG.debug('Making asynchronous cast on %s...', topic)
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool):
|
||||
|
||||
def fanout_cast(conf, context, topic, msg, connection_pool):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||
LOG.debug('Making asynchronous fanout cast...')
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
||||
|
||||
def notify(conf, context, topic, msg, connection_pool, envelope):
|
||||
"""Sends a notification event on a topic."""
|
||||
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
|
||||
LOG.debug('Sending %(event_type)s on %(topic)s',
|
||||
dict(event_type=msg.get('event_type'),
|
||||
topic=topic))
|
||||
_add_unique_id(msg)
|
||||
|
@ -22,7 +22,7 @@ import traceback
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import jsonutils
|
||||
from cloudbaseinit.openstack.common import local
|
||||
@ -50,7 +50,7 @@ deserialize_msg().
|
||||
The current message format (version 2.0) is very simple. It is::
|
||||
|
||||
{
|
||||
'cloudbaseinit.version': <RPC Envelope Version as a String>,
|
||||
'oslo.version': <RPC Envelope Version as a String>,
|
||||
'cloudbaseinit.message': <Application Message Payload, JSON encoded>
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ which includes the JSON encoded application message body, will be passed down
|
||||
to the messaging libraries as a dict.
|
||||
'''
|
||||
|
||||
_VERSION_KEY = 'cloudbaseinit.version'
|
||||
_VERSION_KEY = 'oslo.version'
|
||||
_MESSAGE_KEY = 'cloudbaseinit.message'
|
||||
|
||||
_REMOTE_POSTFIX = '_Remote'
|
||||
@ -85,7 +85,7 @@ class RPCException(Exception):
|
||||
except Exception:
|
||||
# kwargs doesn't match a variable in the message
|
||||
# log the issue and the kwargs
|
||||
LOG.exception(_('Exception in string format operation'))
|
||||
LOG.exception(_LE('Exception in string format operation'))
|
||||
for name, value in six.iteritems(kwargs):
|
||||
LOG.error("%s: %s" % (name, value))
|
||||
# at least get the core message out if something happened
|
||||
@ -289,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
failure = failure_info[1]
|
||||
if log_failure:
|
||||
LOG.error(_("Returning exception %s to caller"),
|
||||
LOG.error(_LE("Returning exception %s to caller"),
|
||||
six.text_type(failure))
|
||||
LOG.error(tb)
|
||||
|
||||
|
@ -140,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None):
|
||||
if not method:
|
||||
return
|
||||
args = msg.get('args', {})
|
||||
version = msg.get('version', None)
|
||||
namespace = msg.get('namespace', None)
|
||||
version = msg.get('version')
|
||||
namespace = msg.get('namespace')
|
||||
|
||||
try:
|
||||
consumer = CONSUMERS[topic][0]
|
||||
@ -185,8 +185,8 @@ def fanout_cast(conf, context, topic, msg):
|
||||
if not method:
|
||||
return
|
||||
args = msg.get('args', {})
|
||||
version = msg.get('version', None)
|
||||
namespace = msg.get('namespace', None)
|
||||
version = msg.get('version')
|
||||
namespace = msg.get('namespace')
|
||||
|
||||
for consumer in CONSUMERS.get(topic, []):
|
||||
try:
|
||||
|
@ -29,7 +29,7 @@ from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common import excutils
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI
|
||||
from cloudbaseinit.openstack.common import network_utils
|
||||
from cloudbaseinit.openstack.common.rpc import amqp as rpc_amqp
|
||||
from cloudbaseinit.openstack.common.rpc import common as rpc_common
|
||||
@ -50,8 +50,12 @@ kombu_opts = [
|
||||
help='SSL cert file (valid only if SSL enabled)'),
|
||||
cfg.StrOpt('kombu_ssl_ca_certs',
|
||||
default='',
|
||||
help=('SSL certification authority file '
|
||||
'(valid only if SSL enabled)')),
|
||||
help='SSL certification authority file '
|
||||
'(valid only if SSL enabled)'),
|
||||
cfg.FloatOpt('kombu_reconnect_delay',
|
||||
default=1.0,
|
||||
help='How long to wait before reconnecting in response to an '
|
||||
'AMQP consumer cancel notification.'),
|
||||
cfg.StrOpt('rabbit_host',
|
||||
default='localhost',
|
||||
help='The RabbitMQ broker address where a single node is used'),
|
||||
@ -153,12 +157,12 @@ class ConsumerBase(object):
|
||||
callback(msg)
|
||||
except Exception:
|
||||
if self.ack_on_error:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
LOG.exception(_LE("Failed to process message"
|
||||
" ... skipping it."))
|
||||
message.ack()
|
||||
else:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... will requeue."))
|
||||
LOG.exception(_LE("Failed to process message"
|
||||
" ... will requeue."))
|
||||
message.requeue()
|
||||
else:
|
||||
message.ack()
|
||||
@ -458,6 +462,9 @@ class Connection(object):
|
||||
|
||||
self.params_list = params_list
|
||||
|
||||
brokers_count = len(self.params_list)
|
||||
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||
|
||||
self.memory_transport = self.conf.fake_rabbit
|
||||
|
||||
self.connection = None
|
||||
@ -492,9 +499,20 @@ class Connection(object):
|
||||
be handled by the caller.
|
||||
"""
|
||||
if self.connection:
|
||||
LOG.info(_("Reconnecting to AMQP server on "
|
||||
LOG.info(_LI("Reconnecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % params)
|
||||
try:
|
||||
# XXX(nic): when reconnecting to a RabbitMQ cluster
|
||||
# with mirrored queues in use, the attempt to release the
|
||||
# connection can hang "indefinitely" somewhere deep down
|
||||
# in Kombu. Blocking the thread for a bit prior to
|
||||
# release seems to kludge around the problem where it is
|
||||
# otherwise reproduceable.
|
||||
if self.conf.kombu_reconnect_delay > 0:
|
||||
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
|
||||
self.conf.kombu_reconnect_delay)
|
||||
time.sleep(self.conf.kombu_reconnect_delay)
|
||||
|
||||
self.connection.release()
|
||||
except self.connection_errors:
|
||||
pass
|
||||
@ -514,7 +532,7 @@ class Connection(object):
|
||||
self.channel._new_queue('ae.undeliver')
|
||||
for consumer in self.consumers:
|
||||
consumer.reconnect(self.channel)
|
||||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
|
||||
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
|
||||
params)
|
||||
|
||||
def reconnect(self):
|
||||
@ -528,7 +546,7 @@ class Connection(object):
|
||||
|
||||
attempt = 0
|
||||
while True:
|
||||
params = self.params_list[attempt % len(self.params_list)]
|
||||
params = self.params_list[next(self.next_broker_indices)]
|
||||
attempt += 1
|
||||
try:
|
||||
self._connect(params)
|
||||
@ -546,7 +564,7 @@ class Connection(object):
|
||||
raise
|
||||
|
||||
log_info = {}
|
||||
log_info['err_str'] = str(e)
|
||||
log_info['err_str'] = e
|
||||
log_info['max_retries'] = self.max_retries
|
||||
log_info.update(params)
|
||||
|
||||
@ -565,9 +583,9 @@ class Connection(object):
|
||||
sleep_time = min(sleep_time, self.interval_max)
|
||||
|
||||
log_info['sleep_time'] = sleep_time
|
||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
||||
'unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
|
||||
'unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def ensure(self, error_callback, method, *args, **kwargs):
|
||||
@ -618,8 +636,8 @@ class Connection(object):
|
||||
"""
|
||||
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||
log_info = {'topic': topic, 'err_str': exc}
|
||||
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
|
||||
"%(err_str)s") % log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
@ -637,12 +655,12 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, socket.timeout):
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug('Timed out waiting for RPC response: %s' %
|
||||
exc)
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
LOG.exception(_LE('Failed to consume message from queue: %s') %
|
||||
exc)
|
||||
info['do_consume'] = True
|
||||
|
||||
def _consume():
|
||||
@ -679,8 +697,8 @@ class Connection(object):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _error_callback(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.exception(_("Failed to publish message to topic "
|
||||
log_info = {'topic': topic, 'err_str': exc}
|
||||
LOG.exception(_LE("Failed to publish message to topic "
|
||||
"'%(topic)s': %(err_str)s") % log_info)
|
||||
|
||||
def _publish():
|
||||
|
@ -23,7 +23,7 @@ from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common import excutils
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import jsonutils
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
@ -188,7 +188,7 @@ class ConsumerBase(object):
|
||||
msg = rpc_common.deserialize_msg(message.content)
|
||||
self.callback(msg)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
LOG.exception(_LE("Failed to process message... skipping it."))
|
||||
finally:
|
||||
# TODO(sandy): Need support for optional ack_on_error.
|
||||
self.session.acknowledge(message)
|
||||
@ -224,7 +224,7 @@ class DirectConsumer(ConsumerBase):
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
link_name = None
|
||||
link_name = msg_id
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
@ -368,7 +368,7 @@ class DirectPublisher(Publisher):
|
||||
"""Init a 'direct' publisher."""
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = msg_id
|
||||
node_name = "%s/%s" % (msg_id, msg_id)
|
||||
node_opts = {"type": "direct"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
@ -444,6 +444,7 @@ class Connection(object):
|
||||
if not qpid_messaging:
|
||||
raise ImportError("Failed to import qpid.messaging")
|
||||
|
||||
self.connection = None
|
||||
self.session = None
|
||||
self.consumers = {}
|
||||
self.consumer_thread = None
|
||||
@ -467,7 +468,10 @@ class Connection(object):
|
||||
self.brokers = params['qpid_hosts']
|
||||
self.username = params['username']
|
||||
self.password = params['password']
|
||||
self.connection_create(self.brokers[0])
|
||||
|
||||
brokers_count = len(self.brokers)
|
||||
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||
|
||||
self.reconnect()
|
||||
|
||||
def connection_create(self, broker):
|
||||
@ -494,31 +498,29 @@ class Connection(object):
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing sessions and queues."""
|
||||
attempt = 0
|
||||
delay = 1
|
||||
while True:
|
||||
# Close the session if necessary
|
||||
if self.connection.opened():
|
||||
if self.connection is not None and self.connection.opened():
|
||||
try:
|
||||
self.connection.close()
|
||||
except qpid_exceptions.ConnectionError:
|
||||
except qpid_exceptions.MessagingError:
|
||||
pass
|
||||
|
||||
broker = self.brokers[attempt % len(self.brokers)]
|
||||
attempt += 1
|
||||
broker = self.brokers[next(self.next_broker_indices)]
|
||||
|
||||
try:
|
||||
self.connection_create(broker)
|
||||
self.connection.open()
|
||||
except qpid_exceptions.ConnectionError as e:
|
||||
except qpid_exceptions.MessagingError as e:
|
||||
msg_dict = dict(e=e, delay=delay)
|
||||
msg = _("Unable to connect to AMQP server: %(e)s. "
|
||||
"Sleeping %(delay)s seconds") % msg_dict
|
||||
msg = _LE("Unable to connect to AMQP server: %(e)s. "
|
||||
"Sleeping %(delay)s seconds") % msg_dict
|
||||
LOG.error(msg)
|
||||
time.sleep(delay)
|
||||
delay = min(2 * delay, 60)
|
||||
delay = min(delay + 1, 5)
|
||||
else:
|
||||
LOG.info(_('Connected to AMQP server on %s'), broker)
|
||||
LOG.info(_LI('Connected to AMQP server on %s'), broker)
|
||||
break
|
||||
|
||||
self.session = self.connection.session()
|
||||
@ -531,14 +533,14 @@ class Connection(object):
|
||||
consumer.reconnect(self.session)
|
||||
self._register_consumer(consumer)
|
||||
|
||||
LOG.debug(_("Re-established AMQP queues"))
|
||||
LOG.debug("Re-established AMQP queues")
|
||||
|
||||
def ensure(self, error_callback, method, *args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except (qpid_exceptions.Empty,
|
||||
qpid_exceptions.ConnectionError) as e:
|
||||
qpid_exceptions.MessagingError) as e:
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
@ -569,8 +571,8 @@ class Connection(object):
|
||||
add it to our list of consumers
|
||||
"""
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||
log_info = {'topic': topic, 'err_str': exc}
|
||||
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
|
||||
"%(err_str)s") % log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
@ -585,19 +587,19 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, qpid_exceptions.Empty):
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug('Timed out waiting for RPC response: %s' %
|
||||
exc)
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
LOG.exception(_LE('Failed to consume message from queue: %s') %
|
||||
exc)
|
||||
|
||||
def _consume():
|
||||
nxt_receiver = self.session.next_receiver(timeout=timeout)
|
||||
try:
|
||||
self._lookup_consumer(nxt_receiver).consume()
|
||||
except Exception:
|
||||
LOG.exception(_("Error processing message. Skipping it."))
|
||||
LOG.exception(_LE("Error processing message. Skipping it."))
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
@ -623,8 +625,8 @@ class Connection(object):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
LOG.exception(_("Failed to publish message to topic "
|
||||
log_info = {'topic': topic, 'err_str': exc}
|
||||
LOG.exception(_LE("Failed to publish message to topic "
|
||||
"'%(topic)s': %(err_str)s") % log_info)
|
||||
|
||||
def _publisher_send():
|
||||
|
@ -27,7 +27,7 @@ import six
|
||||
from six import moves
|
||||
|
||||
from cloudbaseinit.openstack.common import excutils
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import jsonutils
|
||||
from cloudbaseinit.openstack.common.rpc import common as rpc_common
|
||||
@ -63,7 +63,7 @@ zmq_opts = [
|
||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||
help='Number of ZeroMQ contexts, defaults to 1'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
|
||||
cfg.IntOpt('rpc_zmq_topic_backlog',
|
||||
help='Maximum number of ingress messages to locally buffer '
|
||||
'per topic. Default is unlimited.'),
|
||||
|
||||
@ -93,12 +93,12 @@ def _serialize(data):
|
||||
return jsonutils.dumps(data, ensure_ascii=True)
|
||||
except TypeError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("JSON serialization failed."))
|
||||
LOG.error(_LE("JSON serialization failed."))
|
||||
|
||||
|
||||
def _deserialize(data):
|
||||
"""Deserialization wrapper."""
|
||||
LOG.debug(_("Deserializing: %s"), data)
|
||||
LOG.debug("Deserializing: %s", data)
|
||||
return jsonutils.loads(data)
|
||||
|
||||
|
||||
@ -133,9 +133,9 @@ class ZmqSocket(object):
|
||||
str_data = {'addr': addr, 'type': self.socket_s(),
|
||||
'subscribe': subscribe, 'bind': bind}
|
||||
|
||||
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
|
||||
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
|
||||
LOG.debug(_("-> bind: %(bind)s"), str_data)
|
||||
LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
|
||||
LOG.debug("-> Subscribed to %(subscribe)s", str_data)
|
||||
LOG.debug("-> bind: %(bind)s", str_data)
|
||||
|
||||
try:
|
||||
if bind:
|
||||
@ -155,7 +155,7 @@ class ZmqSocket(object):
|
||||
"""Subscribe."""
|
||||
if not self.can_sub:
|
||||
raise RPCException("Cannot subscribe on this socket.")
|
||||
LOG.debug(_("Subscribing to %s"), msg_filter)
|
||||
LOG.debug("Subscribing to %s", msg_filter)
|
||||
|
||||
try:
|
||||
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
|
||||
@ -192,7 +192,7 @@ class ZmqSocket(object):
|
||||
# it would be much worse if some of the code calling this
|
||||
# were to fail. For now, lets log, and later evaluate
|
||||
# if we can safely raise here.
|
||||
LOG.error(_("ZeroMQ socket could not be closed."))
|
||||
LOG.error(_LE("ZeroMQ socket could not be closed."))
|
||||
self.sock = None
|
||||
|
||||
def recv(self, **kwargs):
|
||||
@ -264,7 +264,7 @@ class InternalContext(object):
|
||||
|
||||
def _get_response(self, ctx, proxy, topic, data):
|
||||
"""Process a curried message and cast the result to topic."""
|
||||
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
|
||||
LOG.debug("Running func with context: %s", ctx.to_dict())
|
||||
data.setdefault('version', None)
|
||||
data.setdefault('args', {})
|
||||
|
||||
@ -277,13 +277,13 @@ class InternalContext(object):
|
||||
# ignore these since they are just from shutdowns
|
||||
pass
|
||||
except rpc_common.ClientException as e:
|
||||
LOG.debug(_("Expected exception during message handling (%s)") %
|
||||
LOG.debug("Expected exception during message handling (%s)" %
|
||||
e._exc_info[1])
|
||||
return {'exc':
|
||||
rpc_common.serialize_remote_exception(e._exc_info,
|
||||
log_failure=False)}
|
||||
except Exception:
|
||||
LOG.error(_("Exception during message handling"))
|
||||
LOG.error(_LE("Exception during message handling"))
|
||||
return {'exc':
|
||||
rpc_common.serialize_remote_exception(sys.exc_info())}
|
||||
|
||||
@ -302,7 +302,7 @@ class InternalContext(object):
|
||||
self._get_response(ctx, proxy, topic, payload),
|
||||
ctx.replies)
|
||||
|
||||
LOG.debug(_("Sending reply"))
|
||||
LOG.debug("Sending reply")
|
||||
_multi_send(_cast, ctx, topic, {
|
||||
'method': '-process_reply',
|
||||
'args': {
|
||||
@ -320,7 +320,7 @@ class ConsumerBase(object):
|
||||
|
||||
@classmethod
|
||||
def normalize_reply(self, result, replies):
|
||||
#TODO(ewindisch): re-evaluate and document this method.
|
||||
# TODO(ewindisch): re-evaluate and document this method.
|
||||
if isinstance(result, types.GeneratorType):
|
||||
return list(result)
|
||||
elif replies:
|
||||
@ -336,7 +336,7 @@ class ConsumerBase(object):
|
||||
# processed internally. (non-valid method name)
|
||||
method = data.get('method')
|
||||
if not method:
|
||||
LOG.error(_("RPC message did not include method."))
|
||||
LOG.error(_LE("RPC message did not include method."))
|
||||
return
|
||||
|
||||
# Internal method
|
||||
@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def register(self, proxy, in_addr, zmq_type_in,
|
||||
in_bind=True, subscribe=None):
|
||||
|
||||
LOG.info(_("Registering reactor"))
|
||||
LOG.info(_LI("Registering reactor"))
|
||||
|
||||
if zmq_type_in not in (zmq.PULL, zmq.SUB):
|
||||
raise RPCException("Bad input socktype")
|
||||
@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
self.proxies[inq] = proxy
|
||||
self.sockets.append(inq)
|
||||
|
||||
LOG.info(_("In reactor registered"))
|
||||
LOG.info(_LI("In reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
LOG.info(_LI("Consuming socket"))
|
||||
while True:
|
||||
self.consume(sock)
|
||||
|
||||
@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
|
||||
if topic not in self.topic_proxy:
|
||||
def publisher(waiter):
|
||||
LOG.info(_("Creating proxy for topic: %s"), topic)
|
||||
LOG.info(_LI("Creating proxy for topic: %s"), topic)
|
||||
|
||||
try:
|
||||
# The topic is received over the network,
|
||||
@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
try:
|
||||
wait_sock_creation.wait()
|
||||
except RPCException:
|
||||
LOG.error(_("Topic socket file creation failed."))
|
||||
LOG.error(_LE("Topic socket file creation failed."))
|
||||
return
|
||||
|
||||
try:
|
||||
self.topic_proxy[topic].put_nowait(data)
|
||||
except eventlet.queue.Full:
|
||||
LOG.error(_("Local per-topic backlog buffer full for topic "
|
||||
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||
LOG.error(_LE("Local per-topic backlog buffer full for topic "
|
||||
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Runs the ZmqProxy service."""
|
||||
@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
except os.error:
|
||||
if not os.path.isdir(ipc_dir):
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Required IPC directory does not exist at"
|
||||
" %s") % (ipc_dir, ))
|
||||
LOG.error(_LE("Required IPC directory does not exist at"
|
||||
" %s") % (ipc_dir, ))
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
consume_in,
|
||||
@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
except zmq.ZMQError:
|
||||
if os.access(ipc_dir, os.X_OK):
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Permission denied to IPC directory at"
|
||||
" %s") % (ipc_dir, ))
|
||||
LOG.error(_LE("Permission denied to IPC directory at"
|
||||
" %s") % (ipc_dir, ))
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
|
||||
super(ZmqProxy, self).consume_in_thread()
|
||||
|
||||
@ -539,9 +539,9 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
super(ZmqReactor, self).__init__(conf)
|
||||
|
||||
def consume(self, sock):
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
|
||||
LOG.debug("CONSUMER RECEIVED DATA: %s", data)
|
||||
|
||||
proxy = self.proxies[sock]
|
||||
|
||||
@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
# Unmarshal only after verifying the message.
|
||||
ctx = RpcContext.unmarshal(data[3])
|
||||
else:
|
||||
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
|
||||
LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
|
||||
return
|
||||
|
||||
self.pool.spawn_n(self.process, proxy, ctx, request)
|
||||
@ -588,14 +588,14 @@ class Connection(rpc_common.Connection):
|
||||
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
||||
|
||||
if topic in self.topics:
|
||||
LOG.info(_("Skipping topic registration. Already registered."))
|
||||
LOG.info(_LI("Skipping topic registration. Already registered."))
|
||||
return
|
||||
|
||||
# Receive messages from (local) proxy
|
||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||
(CONF.rpc_zmq_ipc_dir, topic)
|
||||
|
||||
LOG.debug(_("Consumer is a zmq.%s"),
|
||||
LOG.debug("Consumer is a zmq.%s",
|
||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||
|
||||
self.reactor.register(proxy, inaddr, sock_type,
|
||||
@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
# Replies always come into the reply service.
|
||||
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
||||
|
||||
LOG.debug(_("Creating payload"))
|
||||
LOG.debug("Creating payload")
|
||||
# Curry the original request into a reply method.
|
||||
mcontext = RpcContext.marshal(context)
|
||||
payload = {
|
||||
@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug(_("Creating queue socket for reply waiter"))
|
||||
LOG.debug("Creating queue socket for reply waiter")
|
||||
|
||||
# Messages arriving async.
|
||||
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
|
||||
@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
zmq.SUB, subscribe=msg_id, bind=False
|
||||
)
|
||||
|
||||
LOG.debug(_("Sending cast"))
|
||||
LOG.debug("Sending cast")
|
||||
_cast(addr, context, topic, payload, envelope)
|
||||
|
||||
LOG.debug(_("Cast sent; Waiting reply"))
|
||||
LOG.debug("Cast sent; Waiting reply")
|
||||
# Blocks until receives reply
|
||||
msg = msg_waiter.recv()
|
||||
LOG.debug(_("Received message: %s"), msg)
|
||||
LOG.debug(_("Unpacking response"))
|
||||
LOG.debug("Received message: %s", msg)
|
||||
LOG.debug("Unpacking response")
|
||||
|
||||
if msg[2] == 'cast': # Legacy version
|
||||
raw_msg = _deserialize(msg[-1])[-1]
|
||||
@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
Dispatches to the matchmaker and sends message to all relevant hosts.
|
||||
"""
|
||||
conf = CONF
|
||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
|
||||
queues = _get_matchmaker().queues(topic)
|
||||
LOG.debug(_("Sending message(s) to: %s"), queues)
|
||||
LOG.debug("Sending message(s) to: %s", queues)
|
||||
|
||||
# Don't stack if we have no matchmaker results
|
||||
if not queues:
|
||||
|
@ -22,7 +22,7 @@ import contextlib
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _, _LI
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
|
||||
|
||||
@ -127,10 +127,10 @@ class MatchMakerBase(object):
|
||||
def add_binding(self, binding, rule, last=True):
|
||||
self.bindings.append((binding, rule, False, last))
|
||||
|
||||
#NOTE(ewindisch): kept the following method in case we implement the
|
||||
# underlying support.
|
||||
#def add_negate_binding(self, binding, rule, last=True):
|
||||
# self.bindings.append((binding, rule, True, last))
|
||||
# NOTE(ewindisch): kept the following method in case we implement the
|
||||
# underlying support.
|
||||
# def add_negate_binding(self, binding, rule, last=True):
|
||||
# self.bindings.append((binding, rule, True, last))
|
||||
|
||||
def queues(self, key):
|
||||
workers = []
|
||||
@ -213,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
self.hosts.discard(host)
|
||||
self.backend_unregister(key, '.'.join((key, host)))
|
||||
|
||||
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
|
||||
LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
|
||||
{'key': key, 'host': host})
|
||||
|
||||
def start_heartbeat(self):
|
||||
|
@ -34,7 +34,6 @@ matchmaker_redis_opts = [
|
||||
default=6379,
|
||||
help='Use this port to connect to redis host.'),
|
||||
cfg.StrOpt('password',
|
||||
default=None,
|
||||
help='Password for Redis server. (optional)'),
|
||||
]
|
||||
|
||||
|
@ -22,7 +22,7 @@ import json
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LW
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common.rpc import matchmaker as mm
|
||||
|
||||
@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange):
|
||||
def run(self, key):
|
||||
if not self._ring_has(key):
|
||||
LOG.warn(
|
||||
_("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (key, )
|
||||
_LW("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (key, )
|
||||
)
|
||||
return []
|
||||
host = next(self.ring0[key])
|
||||
@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange):
|
||||
nkey = key.split('fanout~')[1:][0]
|
||||
if not self._ring_has(nkey):
|
||||
LOG.warn(
|
||||
_("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (nkey, )
|
||||
_LW("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (nkey, )
|
||||
)
|
||||
return []
|
||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
||||
|
@ -48,7 +48,7 @@ class RpcProxy(object):
|
||||
basis.
|
||||
:param version_cap: Optionally cap the maximum version used for sent
|
||||
messages.
|
||||
:param serializer: Optionaly (de-)serialize entities with a
|
||||
:param serializer: Optionally (de-)serialize entities with a
|
||||
provided helper.
|
||||
"""
|
||||
self.topic = topic
|
||||
|
@ -15,7 +15,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common import rpc
|
||||
from cloudbaseinit.openstack.common.rpc import dispatcher as rpc_dispatcher
|
||||
@ -44,7 +43,7 @@ class Service(service.Service):
|
||||
super(Service, self).start()
|
||||
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
LOG.debug(_("Creating Consumer connection for Service %s") %
|
||||
LOG.debug("Creating Consumer connection for Service %s" %
|
||||
self.topic)
|
||||
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
|
||||
|
@ -38,9 +38,10 @@ from eventlet import event
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit.openstack.common import eventlet_backdoor
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common.gettextutils import _LE, _LI, _LW
|
||||
from cloudbaseinit.openstack.common import importutils
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.openstack.common import systemd
|
||||
from cloudbaseinit.openstack.common import threadgroup
|
||||
|
||||
|
||||
@ -163,7 +164,7 @@ class ServiceLauncher(Launcher):
|
||||
status = None
|
||||
signo = 0
|
||||
|
||||
LOG.debug(_('Full set of CONF:'))
|
||||
LOG.debug('Full set of CONF:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
try:
|
||||
@ -172,7 +173,7 @@ class ServiceLauncher(Launcher):
|
||||
super(ServiceLauncher, self).wait()
|
||||
except SignalExit as exc:
|
||||
signame = _signo_to_signame(exc.signo)
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
LOG.info(_LI('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
signo = exc.signo
|
||||
except SystemExit as exc:
|
||||
@ -184,11 +185,12 @@ class ServiceLauncher(Launcher):
|
||||
rpc.cleanup()
|
||||
except Exception:
|
||||
# We're shutting down, so it doesn't matter at this point.
|
||||
LOG.exception(_('Exception during rpc cleanup.'))
|
||||
LOG.exception(_LE('Exception during rpc cleanup.'))
|
||||
|
||||
return status, signo
|
||||
|
||||
def wait(self, ready_callback=None):
|
||||
systemd.notify_once()
|
||||
while True:
|
||||
self.handle_signal()
|
||||
status, signo = self._wait_for_exit_or_signal(ready_callback)
|
||||
@ -235,7 +237,7 @@ class ProcessLauncher(object):
|
||||
# dies unexpectedly
|
||||
self.readpipe.read()
|
||||
|
||||
LOG.info(_('Parent process has died unexpectedly, exiting'))
|
||||
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
@ -266,13 +268,13 @@ class ProcessLauncher(object):
|
||||
launcher.wait()
|
||||
except SignalExit as exc:
|
||||
signame = _signo_to_signame(exc.signo)
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
LOG.info(_LI('Child caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
signo = exc.signo
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
except BaseException:
|
||||
LOG.exception(_('Unhandled exception'))
|
||||
LOG.exception(_LE('Unhandled exception'))
|
||||
status = 2
|
||||
finally:
|
||||
launcher.stop()
|
||||
@ -305,7 +307,7 @@ class ProcessLauncher(object):
|
||||
# start up quickly but ensure we don't fork off children that
|
||||
# die instantly too quickly.
|
||||
if time.time() - wrap.forktimes[0] < wrap.workers:
|
||||
LOG.info(_('Forking too fast, sleeping'))
|
||||
LOG.info(_LI('Forking too fast, sleeping'))
|
||||
time.sleep(1)
|
||||
|
||||
wrap.forktimes.pop(0)
|
||||
@ -324,7 +326,7 @@ class ProcessLauncher(object):
|
||||
|
||||
os._exit(status)
|
||||
|
||||
LOG.info(_('Started child %d'), pid)
|
||||
LOG.info(_LI('Started child %d'), pid)
|
||||
|
||||
wrap.children.add(pid)
|
||||
self.children[pid] = wrap
|
||||
@ -334,7 +336,7 @@ class ProcessLauncher(object):
|
||||
def launch_service(self, service, workers=1):
|
||||
wrap = ServiceWrapper(service, workers)
|
||||
|
||||
LOG.info(_('Starting %d workers'), wrap.workers)
|
||||
LOG.info(_LI('Starting %d workers'), wrap.workers)
|
||||
while self.running and len(wrap.children) < wrap.workers:
|
||||
self._start_child(wrap)
|
||||
|
||||
@ -351,15 +353,15 @@ class ProcessLauncher(object):
|
||||
|
||||
if os.WIFSIGNALED(status):
|
||||
sig = os.WTERMSIG(status)
|
||||
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
|
||||
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
|
||||
dict(pid=pid, sig=sig))
|
||||
else:
|
||||
code = os.WEXITSTATUS(status)
|
||||
LOG.info(_('Child %(pid)s exited with status %(code)d'),
|
||||
LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
|
||||
dict(pid=pid, code=code))
|
||||
|
||||
if pid not in self.children:
|
||||
LOG.warning(_('pid %d not in child list'), pid)
|
||||
LOG.warning(_LW('pid %d not in child list'), pid)
|
||||
return None
|
||||
|
||||
wrap = self.children.pop(pid)
|
||||
@ -381,23 +383,35 @@ class ProcessLauncher(object):
|
||||
def wait(self):
|
||||
"""Loop waiting on children to die and respawning as necessary."""
|
||||
|
||||
LOG.debug(_('Full set of CONF:'))
|
||||
systemd.notify_once()
|
||||
LOG.debug('Full set of CONF:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
while True:
|
||||
self.handle_signal()
|
||||
self._respawn_children()
|
||||
if self.sigcaught:
|
||||
try:
|
||||
while True:
|
||||
self.handle_signal()
|
||||
self._respawn_children()
|
||||
# No signal means that stop was called. Don't clean up here.
|
||||
if not self.sigcaught:
|
||||
return
|
||||
|
||||
signame = _signo_to_signame(self.sigcaught)
|
||||
LOG.info(_('Caught %s, stopping children'), signame)
|
||||
if not _is_sighup_and_daemon(self.sigcaught):
|
||||
break
|
||||
LOG.info(_LI('Caught %s, stopping children'), signame)
|
||||
if not _is_sighup_and_daemon(self.sigcaught):
|
||||
break
|
||||
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGHUP)
|
||||
self.running = True
|
||||
self.sigcaught = None
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGHUP)
|
||||
self.running = True
|
||||
self.sigcaught = None
|
||||
except eventlet.greenlet.GreenletExit:
|
||||
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
|
||||
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
"""Terminate child processes and wait on each."""
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
@ -407,7 +421,7 @@ class ProcessLauncher(object):
|
||||
|
||||
# Wait for children to die
|
||||
if self.children:
|
||||
LOG.info(_('Waiting on %d children to exit'), len(self.children))
|
||||
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
|
||||
while self.children:
|
||||
self._wait_child()
|
||||
|
||||
|
@ -22,17 +22,14 @@ from cloudbaseinit.openstack.common.gettextutils import _
|
||||
|
||||
ssl_opts = [
|
||||
cfg.StrOpt('ca_file',
|
||||
default=None,
|
||||
help="CA certificate file to use to verify "
|
||||
"connecting clients"),
|
||||
"connecting clients."),
|
||||
cfg.StrOpt('cert_file',
|
||||
default=None,
|
||||
help="Certificate file to use when starting "
|
||||
"the server securely"),
|
||||
"the server securely."),
|
||||
cfg.StrOpt('key_file',
|
||||
default=None,
|
||||
help="Private key file to use when starting "
|
||||
"the server securely"),
|
||||
"the server securely."),
|
||||
]
|
||||
|
||||
|
||||
|
311
cloudbaseinit/openstack/common/strutils.py
Normal file
311
cloudbaseinit/openstack/common/strutils.py
Normal file
@ -0,0 +1,311 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
System-level utilities and helper functions.
|
||||
"""
|
||||
|
||||
import math
|
||||
import re
|
||||
import sys
|
||||
import unicodedata
|
||||
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
UNIT_PREFIX_EXPONENT = {
|
||||
'k': 1,
|
||||
'K': 1,
|
||||
'Ki': 1,
|
||||
'M': 2,
|
||||
'Mi': 2,
|
||||
'G': 3,
|
||||
'Gi': 3,
|
||||
'T': 4,
|
||||
'Ti': 4,
|
||||
}
|
||||
UNIT_SYSTEM_INFO = {
|
||||
'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
|
||||
'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
|
||||
}
|
||||
|
||||
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
|
||||
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
|
||||
|
||||
SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
|
||||
SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
|
||||
|
||||
|
||||
# NOTE(flaper87): The following globals are used by `mask_password`
|
||||
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
|
||||
|
||||
# NOTE(ldbragst): Let's build a list of regex objects using the list of
|
||||
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
|
||||
# to the list of _SANITIZE_KEYS and we can generate regular expressions
|
||||
# for XML and JSON automatically.
|
||||
_SANITIZE_PATTERNS_2 = []
|
||||
_SANITIZE_PATTERNS_1 = []
|
||||
|
||||
# NOTE(amrith): Some regular expressions have only one parameter, some
|
||||
# have two parameters. Use different lists of patterns here.
|
||||
_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+']
|
||||
_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
|
||||
r'(%(key)s\s+[\"\']).*?([\"\'])',
|
||||
r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)',
|
||||
r'(<%(key)s>).*?(</%(key)s>)',
|
||||
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
|
||||
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
|
||||
r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?'
|
||||
'[\'"]).*?([\'"])',
|
||||
r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)']
|
||||
|
||||
for key in _SANITIZE_KEYS:
|
||||
for pattern in _FORMAT_PATTERNS_2:
|
||||
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
|
||||
_SANITIZE_PATTERNS_2.append(reg_ex)
|
||||
|
||||
for pattern in _FORMAT_PATTERNS_1:
|
||||
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
|
||||
_SANITIZE_PATTERNS_1.append(reg_ex)
|
||||
|
||||
|
||||
def int_from_bool_as_string(subject):
|
||||
"""Interpret a string as a boolean and return either 1 or 0.
|
||||
|
||||
Any string value in:
|
||||
|
||||
('True', 'true', 'On', 'on', '1')
|
||||
|
||||
is interpreted as a boolean True.
|
||||
|
||||
Useful for JSON-decoded stuff and config file parsing
|
||||
"""
|
||||
return bool_from_string(subject) and 1 or 0
|
||||
|
||||
|
||||
def bool_from_string(subject, strict=False, default=False):
|
||||
"""Interpret a string as a boolean.
|
||||
|
||||
A case-insensitive match is performed such that strings matching 't',
|
||||
'true', 'on', 'y', 'yes', or '1' are considered True and, when
|
||||
`strict=False`, anything else returns the value specified by 'default'.
|
||||
|
||||
Useful for JSON-decoded stuff and config file parsing.
|
||||
|
||||
If `strict=True`, unrecognized values, including None, will raise a
|
||||
ValueError which is useful when parsing values passed in from an API call.
|
||||
Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
|
||||
"""
|
||||
if not isinstance(subject, six.string_types):
|
||||
subject = six.text_type(subject)
|
||||
|
||||
lowered = subject.strip().lower()
|
||||
|
||||
if lowered in TRUE_STRINGS:
|
||||
return True
|
||||
elif lowered in FALSE_STRINGS:
|
||||
return False
|
||||
elif strict:
|
||||
acceptable = ', '.join(
|
||||
"'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
|
||||
msg = _("Unrecognized value '%(val)s', acceptable values are:"
|
||||
" %(acceptable)s") % {'val': subject,
|
||||
'acceptable': acceptable}
|
||||
raise ValueError(msg)
|
||||
else:
|
||||
return default
|
||||
|
||||
|
||||
def safe_decode(text, incoming=None, errors='strict'):
|
||||
"""Decodes incoming text/bytes string using `incoming` if they're not
|
||||
already unicode.
|
||||
|
||||
:param incoming: Text's current encoding
|
||||
:param errors: Errors handling policy. See here for valid
|
||||
values http://docs.python.org/2/library/codecs.html
|
||||
:returns: text or a unicode `incoming` encoded
|
||||
representation of it.
|
||||
:raises TypeError: If text is not an instance of str
|
||||
"""
|
||||
if not isinstance(text, (six.string_types, six.binary_type)):
|
||||
raise TypeError("%s can't be decoded" % type(text))
|
||||
|
||||
if isinstance(text, six.text_type):
|
||||
return text
|
||||
|
||||
if not incoming:
|
||||
incoming = (sys.stdin.encoding or
|
||||
sys.getdefaultencoding())
|
||||
|
||||
try:
|
||||
return text.decode(incoming, errors)
|
||||
except UnicodeDecodeError:
|
||||
# Note(flaper87) If we get here, it means that
|
||||
# sys.stdin.encoding / sys.getdefaultencoding
|
||||
# didn't return a suitable encoding to decode
|
||||
# text. This happens mostly when global LANG
|
||||
# var is not set correctly and there's no
|
||||
# default encoding. In this case, most likely
|
||||
# python will use ASCII or ANSI encoders as
|
||||
# default encodings but they won't be capable
|
||||
# of decoding non-ASCII characters.
|
||||
#
|
||||
# Also, UTF-8 is being used since it's an ASCII
|
||||
# extension.
|
||||
return text.decode('utf-8', errors)
|
||||
|
||||
|
||||
def safe_encode(text, incoming=None,
|
||||
encoding='utf-8', errors='strict'):
|
||||
"""Encodes incoming text/bytes string using `encoding`.
|
||||
|
||||
If incoming is not specified, text is expected to be encoded with
|
||||
current python's default encoding. (`sys.getdefaultencoding`)
|
||||
|
||||
:param incoming: Text's current encoding
|
||||
:param encoding: Expected encoding for text (Default UTF-8)
|
||||
:param errors: Errors handling policy. See here for valid
|
||||
values http://docs.python.org/2/library/codecs.html
|
||||
:returns: text or a bytestring `encoding` encoded
|
||||
representation of it.
|
||||
:raises TypeError: If text is not an instance of str
|
||||
"""
|
||||
if not isinstance(text, (six.string_types, six.binary_type)):
|
||||
raise TypeError("%s can't be encoded" % type(text))
|
||||
|
||||
if not incoming:
|
||||
incoming = (sys.stdin.encoding or
|
||||
sys.getdefaultencoding())
|
||||
|
||||
if isinstance(text, six.text_type):
|
||||
return text.encode(encoding, errors)
|
||||
elif text and encoding != incoming:
|
||||
# Decode text before encoding it with `encoding`
|
||||
text = safe_decode(text, incoming, errors)
|
||||
return text.encode(encoding, errors)
|
||||
else:
|
||||
return text
|
||||
|
||||
|
||||
def string_to_bytes(text, unit_system='IEC', return_int=False):
|
||||
"""Converts a string into an float representation of bytes.
|
||||
|
||||
The units supported for IEC ::
|
||||
|
||||
Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
|
||||
KB, KiB, MB, MiB, GB, GiB, TB, TiB
|
||||
|
||||
The units supported for SI ::
|
||||
|
||||
kb(it), Mb(it), Gb(it), Tb(it)
|
||||
kB, MB, GB, TB
|
||||
|
||||
Note that the SI unit system does not support capital letter 'K'
|
||||
|
||||
:param text: String input for bytes size conversion.
|
||||
:param unit_system: Unit system for byte size conversion.
|
||||
:param return_int: If True, returns integer representation of text
|
||||
in bytes. (default: decimal)
|
||||
:returns: Numerical representation of text in bytes.
|
||||
:raises ValueError: If text has an invalid value.
|
||||
|
||||
"""
|
||||
try:
|
||||
base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
|
||||
except KeyError:
|
||||
msg = _('Invalid unit system: "%s"') % unit_system
|
||||
raise ValueError(msg)
|
||||
match = reg_ex.match(text)
|
||||
if match:
|
||||
magnitude = float(match.group(1))
|
||||
unit_prefix = match.group(2)
|
||||
if match.group(3) in ['b', 'bit']:
|
||||
magnitude /= 8
|
||||
else:
|
||||
msg = _('Invalid string format: %s') % text
|
||||
raise ValueError(msg)
|
||||
if not unit_prefix:
|
||||
res = magnitude
|
||||
else:
|
||||
res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
|
||||
if return_int:
|
||||
return int(math.ceil(res))
|
||||
return res
|
||||
|
||||
|
||||
def to_slug(value, incoming=None, errors="strict"):
|
||||
"""Normalize string.
|
||||
|
||||
Convert to lowercase, remove non-word characters, and convert spaces
|
||||
to hyphens.
|
||||
|
||||
Inspired by Django's `slugify` filter.
|
||||
|
||||
:param value: Text to slugify
|
||||
:param incoming: Text's current encoding
|
||||
:param errors: Errors handling policy. See here for valid
|
||||
values http://docs.python.org/2/library/codecs.html
|
||||
:returns: slugified unicode representation of `value`
|
||||
:raises TypeError: If text is not an instance of str
|
||||
"""
|
||||
value = safe_decode(value, incoming, errors)
|
||||
# NOTE(aababilov): no need to use safe_(encode|decode) here:
|
||||
# encodings are always "ascii", error handling is always "ignore"
|
||||
# and types are always known (first: unicode; second: str)
|
||||
value = unicodedata.normalize("NFKD", value).encode(
|
||||
"ascii", "ignore").decode("ascii")
|
||||
value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
|
||||
return SLUGIFY_HYPHENATE_RE.sub("-", value)
|
||||
|
||||
|
||||
def mask_password(message, secret="***"):
|
||||
"""Replace password with 'secret' in message.
|
||||
|
||||
:param message: The string which includes security information.
|
||||
:param secret: value with which to replace passwords.
|
||||
:returns: The unicode value of message with the password fields masked.
|
||||
|
||||
For example:
|
||||
|
||||
>>> mask_password("'adminPass' : 'aaaaa'")
|
||||
"'adminPass' : '***'"
|
||||
>>> mask_password("'admin_pass' : 'aaaaa'")
|
||||
"'admin_pass' : '***'"
|
||||
>>> mask_password('"password" : "aaaaa"')
|
||||
'"password" : "***"'
|
||||
>>> mask_password("'original_password' : 'aaaaa'")
|
||||
"'original_password' : '***'"
|
||||
>>> mask_password("u'original_password' : u'aaaaa'")
|
||||
"u'original_password' : u'***'"
|
||||
"""
|
||||
message = six.text_type(message)
|
||||
|
||||
# NOTE(ldbragst): Check to see if anything in message contains any key
|
||||
# specified in _SANITIZE_KEYS, if not then just return the message since
|
||||
# we don't have to mask any passwords.
|
||||
if not any(key in message for key in _SANITIZE_KEYS):
|
||||
return message
|
||||
|
||||
substitute = r'\g<1>' + secret + r'\g<2>'
|
||||
for pattern in _SANITIZE_PATTERNS_2:
|
||||
message = re.sub(pattern, substitute, message)
|
||||
|
||||
substitute = r'\g<1>' + secret
|
||||
for pattern in _SANITIZE_PATTERNS_1:
|
||||
message = re.sub(pattern, substitute, message)
|
||||
|
||||
return message
|
106
cloudbaseinit/openstack/common/systemd.py
Normal file
106
cloudbaseinit/openstack/common/systemd.py
Normal file
@ -0,0 +1,106 @@
|
||||
# Copyright 2012-2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Helper module for systemd service readiness notification.
|
||||
"""
|
||||
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _abstractify(socket_name):
|
||||
if socket_name.startswith('@'):
|
||||
# abstract namespace socket
|
||||
socket_name = '\0%s' % socket_name[1:]
|
||||
return socket_name
|
||||
|
||||
|
||||
def _sd_notify(unset_env, msg):
|
||||
notify_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if notify_socket:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
try:
|
||||
sock.connect(_abstractify(notify_socket))
|
||||
sock.sendall(msg)
|
||||
if unset_env:
|
||||
del os.environ['NOTIFY_SOCKET']
|
||||
except EnvironmentError:
|
||||
LOG.debug("Systemd notification failed", exc_info=True)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
def notify():
|
||||
"""Send notification to Systemd that service is ready.
|
||||
|
||||
For details see
|
||||
http://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||
"""
|
||||
_sd_notify(False, 'READY=1')
|
||||
|
||||
|
||||
def notify_once():
|
||||
"""Send notification once to Systemd that service is ready.
|
||||
|
||||
Systemd sets NOTIFY_SOCKET environment variable with the name of the
|
||||
socket listening for notifications from services.
|
||||
This method removes the NOTIFY_SOCKET environment variable to ensure
|
||||
notification is sent only once.
|
||||
"""
|
||||
_sd_notify(True, 'READY=1')
|
||||
|
||||
|
||||
def onready(notify_socket, timeout):
|
||||
"""Wait for systemd style notification on the socket.
|
||||
|
||||
:param notify_socket: local socket address
|
||||
:type notify_socket: string
|
||||
:param timeout: socket timeout
|
||||
:type timeout: float
|
||||
:returns: 0 service ready
|
||||
1 service not ready
|
||||
2 timeout occurred
|
||||
"""
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.settimeout(timeout)
|
||||
sock.bind(_abstractify(notify_socket))
|
||||
try:
|
||||
msg = sock.recv(512)
|
||||
except socket.timeout:
|
||||
return 2
|
||||
finally:
|
||||
sock.close()
|
||||
if 'READY=1' in msg:
|
||||
return 0
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# simple CLI for testing
|
||||
if len(sys.argv) == 1:
|
||||
notify()
|
||||
elif len(sys.argv) >= 2:
|
||||
timeout = float(sys.argv[1])
|
||||
notify_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if notify_socket:
|
||||
retval = onready(notify_socket, timeout)
|
||||
sys.exit(retval)
|
@ -85,7 +85,7 @@ class ThreadGroup(object):
|
||||
def thread_done(self, thread):
|
||||
self.threads.remove(thread)
|
||||
|
||||
def stop(self):
|
||||
def _stop_threads(self):
|
||||
current = threading.current_thread()
|
||||
|
||||
# Iterate over a copy of self.threads so thread_done doesn't
|
||||
@ -99,6 +99,7 @@ class ThreadGroup(object):
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
def stop_timers(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.stop()
|
||||
@ -106,6 +107,23 @@ class ThreadGroup(object):
|
||||
LOG.exception(ex)
|
||||
self.timers = []
|
||||
|
||||
def stop(self, graceful=False):
|
||||
"""stop function has the option of graceful=True/False.
|
||||
|
||||
* In case of graceful=True, wait for all threads to be finished.
|
||||
Never kill threads.
|
||||
* In case of graceful=False, kill threads immediately.
|
||||
"""
|
||||
self.stop_timers()
|
||||
if graceful:
|
||||
# In case of graceful=True, wait for all threads to be
|
||||
# finished, never kill threads
|
||||
self.wait()
|
||||
else:
|
||||
# In case of graceful=False(Default), kill threads
|
||||
# immediately
|
||||
self._stop_threads()
|
||||
|
||||
def wait(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
|
@ -114,7 +114,7 @@ def utcnow():
|
||||
|
||||
|
||||
def iso8601_from_timestamp(timestamp):
|
||||
"""Returns a iso8601 formatted date from timestamp."""
|
||||
"""Returns an iso8601 formatted date from timestamp."""
|
||||
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
|
||||
|
||||
|
||||
@ -134,7 +134,7 @@ def set_time_override(override_time=None):
|
||||
|
||||
def advance_time_delta(timedelta):
|
||||
"""Advance overridden time using a datetime.timedelta."""
|
||||
assert(not utcnow.override_time is None)
|
||||
assert utcnow.override_time is not None
|
||||
try:
|
||||
for dt in utcnow.override_time:
|
||||
dt += timedelta
|
||||
|
@ -18,7 +18,10 @@ Helpers for comparing version strings.
|
||||
"""
|
||||
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
import pkg_resources
|
||||
import six
|
||||
|
||||
from cloudbaseinit.openstack.common.gettextutils import _
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
@ -52,18 +55,36 @@ class deprecated(object):
|
||||
>>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1)
|
||||
... def c(): pass
|
||||
|
||||
4. Specifying the deprecated functionality will not be removed:
|
||||
>>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=0)
|
||||
... def d(): pass
|
||||
|
||||
5. Specifying a replacement, deprecated functionality will not be removed:
|
||||
>>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()', remove_in=0)
|
||||
... def e(): pass
|
||||
|
||||
"""
|
||||
|
||||
# NOTE(morganfainberg): Bexar is used for unit test purposes, it is
|
||||
# expected we maintain a gap between Bexar and Folsom in this list.
|
||||
BEXAR = 'B'
|
||||
FOLSOM = 'F'
|
||||
GRIZZLY = 'G'
|
||||
HAVANA = 'H'
|
||||
ICEHOUSE = 'I'
|
||||
JUNO = 'J'
|
||||
KILO = 'K'
|
||||
|
||||
_RELEASES = {
|
||||
# NOTE(morganfainberg): Bexar is used for unit test purposes, it is
|
||||
# expected we maintain a gap between Bexar and Folsom in this list.
|
||||
'B': 'Bexar',
|
||||
'F': 'Folsom',
|
||||
'G': 'Grizzly',
|
||||
'H': 'Havana',
|
||||
'I': 'Icehouse',
|
||||
'J': 'Juno',
|
||||
'K': 'Kilo',
|
||||
}
|
||||
|
||||
_deprecated_msg_with_alternative = _(
|
||||
@ -74,6 +95,12 @@ class deprecated(object):
|
||||
'%(what)s is deprecated as of %(as_of)s and may be '
|
||||
'removed in %(remove_in)s. It will not be superseded.')
|
||||
|
||||
_deprecated_msg_with_alternative_no_removal = _(
|
||||
'%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s.')
|
||||
|
||||
_deprecated_msg_with_no_alternative_no_removal = _(
|
||||
'%(what)s is deprecated as of %(as_of)s. It will not be superseded.')
|
||||
|
||||
def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None):
|
||||
"""Initialize decorator
|
||||
|
||||
@ -91,16 +118,34 @@ class deprecated(object):
|
||||
self.remove_in = remove_in
|
||||
self.what = what
|
||||
|
||||
def __call__(self, func):
|
||||
def __call__(self, func_or_cls):
|
||||
if not self.what:
|
||||
self.what = func.__name__ + '()'
|
||||
self.what = func_or_cls.__name__ + '()'
|
||||
msg, details = self._build_message()
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
msg, details = self._build_message()
|
||||
LOG.deprecated(msg, details)
|
||||
return func(*args, **kwargs)
|
||||
return wrapped
|
||||
if inspect.isfunction(func_or_cls):
|
||||
|
||||
@six.wraps(func_or_cls)
|
||||
def wrapped(*args, **kwargs):
|
||||
LOG.deprecated(msg, details)
|
||||
return func_or_cls(*args, **kwargs)
|
||||
return wrapped
|
||||
elif inspect.isclass(func_or_cls):
|
||||
orig_init = func_or_cls.__init__
|
||||
|
||||
# TODO(tsufiev): change `functools` module to `six` as
|
||||
# soon as six 1.7.4 (with fix for passing `assigned`
|
||||
# argument to underlying `functools.wraps`) is released
|
||||
# and added to the cloudbaseinit-incubator requrements
|
||||
@functools.wraps(orig_init, assigned=('__name__', '__doc__'))
|
||||
def new_init(self, *args, **kwargs):
|
||||
LOG.deprecated(msg, details)
|
||||
orig_init(self, *args, **kwargs)
|
||||
func_or_cls.__init__ = new_init
|
||||
return func_or_cls
|
||||
else:
|
||||
raise TypeError('deprecated can be used only with functions or '
|
||||
'classes')
|
||||
|
||||
def _get_safe_to_remove_release(self, release):
|
||||
# TODO(dstanek): this method will have to be reimplemented once
|
||||
@ -119,9 +164,19 @@ class deprecated(object):
|
||||
|
||||
if self.in_favor_of:
|
||||
details['in_favor_of'] = self.in_favor_of
|
||||
msg = self._deprecated_msg_with_alternative
|
||||
if self.remove_in > 0:
|
||||
msg = self._deprecated_msg_with_alternative
|
||||
else:
|
||||
# There are no plans to remove this function, but it is
|
||||
# now deprecated.
|
||||
msg = self._deprecated_msg_with_alternative_no_removal
|
||||
else:
|
||||
msg = self._deprecated_msg_no_alternative
|
||||
if self.remove_in > 0:
|
||||
msg = self._deprecated_msg_no_alternative
|
||||
else:
|
||||
# There are no plans to remove this function, but it is
|
||||
# now deprecated.
|
||||
msg = self._deprecated_msg_with_no_alternative_no_removal
|
||||
return msg, details
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user