From 2a9be0e16da9d982460802e34183f108e1c969a8 Mon Sep 17 00:00:00 2001 From: Alessandro Pilotti Date: Sun, 7 Sep 2014 20:50:01 +0300 Subject: [PATCH] Updates Oslo modules --- cloudbaseinit/openstack/common/__init__.py | 17 + cloudbaseinit/openstack/common/context.py | 28 +- .../openstack/common/eventlet_backdoor.py | 9 +- cloudbaseinit/openstack/common/excutils.py | 32 +- .../openstack/common/gettextutils.py | 223 +++++++------ cloudbaseinit/openstack/common/importutils.py | 11 +- cloudbaseinit/openstack/common/jsonutils.py | 48 ++- cloudbaseinit/openstack/common/log.py | 284 ++++++++-------- cloudbaseinit/openstack/common/loopingcall.py | 46 +-- .../openstack/common/network_utils.py | 92 +++++- .../openstack/common/notifier/api.py | 13 +- .../openstack/common/notifier/proxy.py | 5 +- .../openstack/common/notifier/rpc_notifier.py | 6 +- .../common/notifier/rpc_notifier2.py | 6 +- cloudbaseinit/openstack/common/rpc/amqp.py | 28 +- cloudbaseinit/openstack/common/rpc/common.py | 10 +- .../openstack/common/rpc/impl_fake.py | 8 +- .../openstack/common/rpc/impl_kombu.py | 62 ++-- .../openstack/common/rpc/impl_qpid.py | 54 +-- .../openstack/common/rpc/impl_zmq.py | 82 ++--- .../openstack/common/rpc/matchmaker.py | 12 +- .../openstack/common/rpc/matchmaker_redis.py | 1 - .../openstack/common/rpc/matchmaker_ring.py | 10 +- cloudbaseinit/openstack/common/rpc/proxy.py | 2 +- cloudbaseinit/openstack/common/rpc/service.py | 3 +- cloudbaseinit/openstack/common/service.py | 66 ++-- cloudbaseinit/openstack/common/sslutils.py | 9 +- cloudbaseinit/openstack/common/strutils.py | 311 ++++++++++++++++++ cloudbaseinit/openstack/common/systemd.py | 106 ++++++ cloudbaseinit/openstack/common/threadgroup.py | 20 +- cloudbaseinit/openstack/common/timeutils.py | 4 +- .../openstack/common/versionutils.py | 75 ++++- 32 files changed, 1216 insertions(+), 467 deletions(-) create mode 100644 cloudbaseinit/openstack/common/strutils.py create mode 100644 cloudbaseinit/openstack/common/systemd.py diff --git a/cloudbaseinit/openstack/common/__init__.py b/cloudbaseinit/openstack/common/__init__.py index e69de29b..d1223eaf 100644 --- a/cloudbaseinit/openstack/common/__init__.py +++ b/cloudbaseinit/openstack/common/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + + +six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox')) diff --git a/cloudbaseinit/openstack/common/context.py b/cloudbaseinit/openstack/common/context.py index 182b0443..b612db71 100644 --- a/cloudbaseinit/openstack/common/context.py +++ b/cloudbaseinit/openstack/common/context.py @@ -25,7 +25,7 @@ import uuid def generate_request_id(): - return 'req-%s' % str(uuid.uuid4()) + return b'req-' + str(uuid.uuid4()).encode('ascii') class RequestContext(object): @@ -77,6 +77,21 @@ class RequestContext(object): 'instance_uuid': self.instance_uuid, 'user_identity': user_idt} + @classmethod + def from_dict(cls, ctx): + return cls( + auth_token=ctx.get("auth_token"), + user=ctx.get("user"), + tenant=ctx.get("tenant"), + domain=ctx.get("domain"), + user_domain=ctx.get("user_domain"), + project_domain=ctx.get("project_domain"), + is_admin=ctx.get("is_admin", False), + read_only=ctx.get("read_only", False), + show_deleted=ctx.get("show_deleted", False), + request_id=ctx.get("request_id"), + instance_uuid=ctx.get("instance_uuid")) + def get_admin_context(show_deleted=False): context = RequestContext(None, @@ -98,3 +113,14 @@ def get_context_from_function_and_args(function, args, kwargs): return arg return None + + +def is_user_context(context): + """Indicates if the request context is a normal user.""" + if not context: + return False + if context.is_admin: + return False + if not context.user_id or not context.project_id: + return False + return True diff --git a/cloudbaseinit/openstack/common/eventlet_backdoor.py b/cloudbaseinit/openstack/common/eventlet_backdoor.py index d7cc819c..e4353a73 100644 --- a/cloudbaseinit/openstack/common/eventlet_backdoor.py +++ b/cloudbaseinit/openstack/common/eventlet_backdoor.py @@ -29,7 +29,7 @@ import eventlet.backdoor import greenlet from oslo.config import cfg -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LI from cloudbaseinit.openstack.common import log as logging help_for_backdoor_port = ( @@ -41,7 +41,6 @@ help_for_backdoor_port = ( "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ cfg.StrOpt('backdoor_port', - default=None, help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] @@ -137,8 +136,10 @@ def initialize_if_enabled(): # In the case of backdoor port being zero, a port number is assigned by # listen(). In any case, pull the port number out here. port = sock.getsockname()[1] - LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') % - {'port': port, 'pid': os.getpid()}) + LOG.info( + _LI('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()} + ) eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) return port diff --git a/cloudbaseinit/openstack/common/excutils.py b/cloudbaseinit/openstack/common/excutils.py index ecd9e320..0c4c42f2 100644 --- a/cloudbaseinit/openstack/common/excutils.py +++ b/cloudbaseinit/openstack/common/excutils.py @@ -24,7 +24,7 @@ import traceback import six -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LE class save_and_reraise_exception(object): @@ -49,9 +49,22 @@ class save_and_reraise_exception(object): decide_if_need_reraise() if not should_be_reraised: ctxt.reraise = False + + If another exception occurs and reraise flag is False, + the saved exception will not be logged. + + If the caller wants to raise new exception during exception handling + he/she sets reraise to False initially with an ability to set it back to + True if needed:: + + except Exception: + with save_and_reraise_exception(reraise=False) as ctxt: + [if statements to determine whether to raise a new exception] + # Not raising a new exception, so reraise + ctxt.reraise = True """ - def __init__(self): - self.reraise = True + def __init__(self, reraise=True): + self.reraise = reraise def __enter__(self): self.type_, self.value, self.tb, = sys.exc_info() @@ -59,10 +72,11 @@ class save_and_reraise_exception(object): def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: - logging.error(_('Original exception being dropped: %s'), - traceback.format_exception(self.type_, - self.value, - self.tb)) + if self.reraise: + logging.error(_LE('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) return False if self.reraise: six.reraise(self.type_, self.value, self.tb) @@ -88,8 +102,8 @@ def forever_retry_uncaught_exceptions(infunc): if (cur_time - last_log_time > 60 or this_exc_message != last_exc_message): logging.exception( - _('Unexpected exception occurred %d time(s)... ' - 'retrying.') % exc_count) + _LE('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) last_log_time = cur_time last_exc_message = this_exc_message exc_count = 0 diff --git a/cloudbaseinit/openstack/common/gettextutils.py b/cloudbaseinit/openstack/common/gettextutils.py index 72f6b026..6cdd74f7 100644 --- a/cloudbaseinit/openstack/common/gettextutils.py +++ b/cloudbaseinit/openstack/common/gettextutils.py @@ -27,18 +27,119 @@ import gettext import locale from logging import handlers import os -import re from babel import localedata import six -_localedir = os.environ.get('cloudbaseinit'.upper() + '_LOCALEDIR') -_t = gettext.translation('cloudbaseinit', localedir=_localedir, fallback=True) - _AVAILABLE_LANGUAGES = {} + +# FIXME(dhellmann): Remove this when moving to oslo.i18n. USE_LAZY = False +class TranslatorFactory(object): + """Create translator functions + """ + + def __init__(self, domain, localedir=None): + """Establish a set of translation functions for the domain. + + :param domain: Name of translation domain, + specifying a message catalog. + :type domain: str + :param lazy: Delays translation until a message is emitted. + Defaults to False. + :type lazy: Boolean + :param localedir: Directory with translation catalogs. + :type localedir: str + """ + self.domain = domain + if localedir is None: + localedir = os.environ.get(domain.upper() + '_LOCALEDIR') + self.localedir = localedir + + def _make_translation_func(self, domain=None): + """Return a new translation function ready for use. + + Takes into account whether or not lazy translation is being + done. + + The domain can be specified to override the default from the + factory, but the localedir from the factory is always used + because we assume the log-level translation catalogs are + installed in the same directory as the main application + catalog. + + """ + if domain is None: + domain = self.domain + t = gettext.translation(domain, + localedir=self.localedir, + fallback=True) + # Use the appropriate method of the translation object based + # on the python version. + m = t.gettext if six.PY3 else t.ugettext + + def f(msg): + """oslo.i18n.gettextutils translation function.""" + if USE_LAZY: + return Message(msg, domain=domain) + return m(msg) + return f + + @property + def primary(self): + "The default translation function." + return self._make_translation_func() + + def _make_log_translation_func(self, level): + return self._make_translation_func(self.domain + '-log-' + level) + + @property + def log_info(self): + "Translate info-level log messages." + return self._make_log_translation_func('info') + + @property + def log_warning(self): + "Translate warning-level log messages." + return self._make_log_translation_func('warning') + + @property + def log_error(self): + "Translate error-level log messages." + return self._make_log_translation_func('error') + + @property + def log_critical(self): + "Translate critical-level log messages." + return self._make_log_translation_func('critical') + + +# NOTE(dhellmann): When this module moves out of the incubator into +# oslo.i18n, these global variables can be moved to an integration +# module within each application. + +# Create the global translation functions. +_translators = TranslatorFactory('cloudbaseinit') + +# The primary translation function using the well-known name "_" +_ = _translators.primary + +# Translators for log levels. +# +# The abbreviated names are meant to reflect the usual use of a short +# name like '_'. The "L" is for "log" and the other letter comes from +# the level. +_LI = _translators.log_info +_LW = _translators.log_warning +_LE = _translators.log_error +_LC = _translators.log_critical + +# NOTE(dhellmann): End of globals that will move to the application's +# integration module. + + def enable_lazy(): """Convenience function for configuring _() to use lazy gettext @@ -51,16 +152,7 @@ def enable_lazy(): USE_LAZY = True -def _(msg): - if USE_LAZY: - return Message(msg, domain='cloudbaseinit') - else: - if six.PY3: - return _t.gettext(msg) - return _t.ugettext(msg) - - -def install(domain, lazy=False): +def install(domain): """Install a _() function using the given translation domain. Given a translation domain, install a _() function using gettext's @@ -71,43 +163,14 @@ def install(domain, lazy=False): a translation-domain-specific environment variable (e.g. NOVA_LOCALEDIR). + Note that to enable lazy translation, enable_lazy must be + called. + :param domain: the translation domain - :param lazy: indicates whether or not to install the lazy _() function. - The lazy _() introduces a way to do deferred translation - of messages by installing a _ that builds Message objects, - instead of strings, which can then be lazily translated into - any available locale. """ - if lazy: - # NOTE(mrodden): Lazy gettext functionality. - # - # The following introduces a deferred way to do translations on - # messages in OpenStack. We override the standard _() function - # and % (format string) operation to build Message objects that can - # later be translated when we have more information. - def _lazy_gettext(msg): - """Create and return a Message object. - - Lazy gettext function for a given domain, it is a factory method - for a project/module to get a lazy gettext function for its own - translation domain (i.e. nova, glance, cinder, etc.) - - Message encapsulates a string so that we can translate - it later when needed. - """ - return Message(msg, domain=domain) - - from six import moves - moves.builtins.__dict__['_'] = _lazy_gettext - else: - localedir = '%s_LOCALEDIR' % domain.upper() - if six.PY3: - gettext.install(domain, - localedir=os.environ.get(localedir)) - else: - gettext.install(domain, - localedir=os.environ.get(localedir), - unicode=True) + from six import moves + tf = TranslatorFactory(domain) + moves.builtins.__dict__['_'] = tf.primary class Message(six.text_type): @@ -214,47 +277,22 @@ class Message(six.text_type): if other is None: params = (other,) elif isinstance(other, dict): - params = self._trim_dictionary_parameters(other) + # Merge the dictionaries + # Copy each item in case one does not support deep copy. + params = {} + if isinstance(self.params, dict): + for key, val in self.params.items(): + params[key] = self._copy_param(val) + for key, val in other.items(): + params[key] = self._copy_param(val) else: params = self._copy_param(other) return params - def _trim_dictionary_parameters(self, dict_param): - """Return a dict that only has matching entries in the msgid.""" - # NOTE(luisg): Here we trim down the dictionary passed as parameters - # to avoid carrying a lot of unnecessary weight around in the message - # object, for example if someone passes in Message() % locals() but - # only some params are used, and additionally we prevent errors for - # non-deepcopyable objects by unicoding() them. - - # Look for %(param) keys in msgid; - # Skip %% and deal with the case where % is first character on the line - keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid) - - # If we don't find any %(param) keys but have a %s - if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid): - # Apparently the full dictionary is the parameter - params = self._copy_param(dict_param) - else: - params = {} - # Save our existing parameters as defaults to protect - # ourselves from losing values if we are called through an - # (erroneous) chain that builds a valid Message with - # arguments, and then does something like "msg % kwds" - # where kwds is an empty dictionary. - src = {} - if isinstance(self.params, dict): - src.update(self.params) - src.update(dict_param) - for key in keys: - params[key] = self._copy_param(src[key]) - - return params - def _copy_param(self, param): try: return copy.deepcopy(param) - except TypeError: + except Exception: # Fallback to casting to unicode this will handle the # python code-like objects that can't be deep-copied return six.text_type(param) @@ -266,13 +304,14 @@ class Message(six.text_type): def __radd__(self, other): return self.__add__(other) - def __str__(self): - # NOTE(luisg): Logging in python 2.6 tries to str() log records, - # and it expects specifically a UnicodeError in order to proceed. - msg = _('Message objects do not support str() because they may ' - 'contain non-ascii characters. ' - 'Please use unicode() or translate() instead.') - raise UnicodeError(msg) + if six.PY2: + def __str__(self): + # NOTE(luisg): Logging in python 2.6 tries to str() log records, + # and it expects specifically a UnicodeError in order to proceed. + msg = _('Message objects do not support str() because they may ' + 'contain non-ascii characters. ' + 'Please use unicode() or translate() instead.') + raise UnicodeError(msg) def get_available_languages(domain): @@ -315,8 +354,8 @@ def get_available_languages(domain): 'zh_Hant_HK': 'zh_HK', 'zh_Hant': 'zh_TW', 'fil': 'tl_PH'} - for (locale, alias) in six.iteritems(aliases): - if locale in language_list and alias not in language_list: + for (locale_, alias) in six.iteritems(aliases): + if locale_ in language_list and alias not in language_list: language_list.append(alias) _AVAILABLE_LANGUAGES[domain] = language_list diff --git a/cloudbaseinit/openstack/common/importutils.py b/cloudbaseinit/openstack/common/importutils.py index 4fd9ae2b..d61bc6b2 100644 --- a/cloudbaseinit/openstack/common/importutils.py +++ b/cloudbaseinit/openstack/common/importutils.py @@ -24,10 +24,10 @@ import traceback def import_class(import_str): """Returns a class from a string including module and class.""" mod_str, _sep, class_str = import_str.rpartition('.') + __import__(mod_str) try: - __import__(mod_str) return getattr(sys.modules[mod_str], class_str) - except (ValueError, AttributeError): + except AttributeError: raise ImportError('Class %s cannot be found (%s)' % (class_str, traceback.format_exception(*sys.exc_info()))) @@ -58,6 +58,13 @@ def import_module(import_str): return sys.modules[import_str] +def import_versioned_module(version, submodule=None): + module = 'cloudbaseinit.v%s' % version + if submodule: + module = '.'.join((module, submodule)) + return import_module(module) + + def try_import(import_str, default=None): """Try to import a module and if it fails return default.""" try: diff --git a/cloudbaseinit/openstack/common/jsonutils.py b/cloudbaseinit/openstack/common/jsonutils.py index a6350a3b..53554f93 100644 --- a/cloudbaseinit/openstack/common/jsonutils.py +++ b/cloudbaseinit/openstack/common/jsonutils.py @@ -31,25 +31,37 @@ This module provides a few things: ''' +import codecs import datetime import functools import inspect import itertools -import json -try: - import xmlrpclib -except ImportError: - # NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3 - # however the function and object call signatures - # remained the same. This whole try/except block should - # be removed and replaced with a call to six.moves once - # six 1.4.2 is released. See http://bit.ly/1bqrVzu - import xmlrpc.client as xmlrpclib +import sys + +is_simplejson = False +if sys.version_info < (2, 7): + # On Python <= 2.6, json module is not C boosted, so try to use + # simplejson module if available + try: + import simplejson as json + # NOTE(mriedem): Make sure we have a new enough version of simplejson + # to support the namedobject_as_tuple argument. This can be removed + # in the Kilo release when python 2.6 support is dropped. + if 'namedtuple_as_object' in inspect.getargspec(json.dumps).args: + is_simplejson = True + else: + import json + except ImportError: + import json +else: + import json import six +import six.moves.xmlrpc_client as xmlrpclib from cloudbaseinit.openstack.common import gettextutils from cloudbaseinit.openstack.common import importutils +from cloudbaseinit.openstack.common import strutils from cloudbaseinit.openstack.common import timeutils netaddr = importutils.try_import("netaddr") @@ -161,15 +173,23 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, def dumps(value, default=to_primitive, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False return json.dumps(value, default=default, **kwargs) -def loads(s): - return json.loads(s) +def dump(obj, fp, *args, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False + return json.dump(obj, fp, *args, **kwargs) -def load(s): - return json.load(s) +def loads(s, encoding='utf-8', **kwargs): + return json.loads(strutils.safe_decode(s, encoding), **kwargs) + + +def load(fp, encoding='utf-8', **kwargs): + return json.load(codecs.getreader(encoding)(fp), **kwargs) try: diff --git a/cloudbaseinit/openstack/common/log.py b/cloudbaseinit/openstack/common/log.py index 92ec28b1..84e3938b 100644 --- a/cloudbaseinit/openstack/common/log.py +++ b/cloudbaseinit/openstack/common/log.py @@ -15,7 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. -"""Openstack logging handler. +"""OpenStack logging handler. This module adds to logging functionality by adding the option to specify a context object when calling the various log methods. If the context object @@ -33,7 +33,7 @@ import logging import logging.config import logging.handlers import os -import re +import socket import sys import traceback @@ -41,31 +41,19 @@ from oslo.config import cfg import six from six import moves +_PY26 = sys.version_info[0:2] == (2, 6) + from cloudbaseinit.openstack.common.gettextutils import _ from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import jsonutils from cloudbaseinit.openstack.common import local +# NOTE(flaper87): Pls, remove when graduating this module +# from the incubator. +from cloudbaseinit.openstack.common.strutils import mask_password # noqa _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" -_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password'] - -# NOTE(ldbragst): Let's build a list of regex objects using the list of -# _SANITIZE_KEYS we already have. This way, we only have to add the new key -# to the list of _SANITIZE_KEYS and we can generate regular expressions -# for XML and JSON automatically. -_SANITIZE_PATTERNS = [] -_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', - r'(<%(key)s>).*?()', - r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', - r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] - -for key in _SANITIZE_KEYS: - for pattern in _FORMAT_PATTERNS: - reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) - _SANITIZE_PATTERNS.append(reg_ex) - common_cli_opts = [ cfg.BoolOpt('debug', @@ -84,14 +72,11 @@ logging_cli_opts = [ cfg.StrOpt('log-config-append', metavar='PATH', deprecated_name='log-config', - help='The name of logging configuration file. It does not ' - 'disable existing loggers, but just appends specified ' - 'logging configuration to any other existing logging ' - 'options. Please see the Python logging module ' - 'documentation for details on logging configuration ' - 'files.'), + help='The name of a logging configuration file. This file ' + 'is appended to any existing logging configuration ' + 'files. For details about logging configuration files, ' + 'see the Python logging module documentation.'), cfg.StrOpt('log-format', - default=None, metavar='FORMAT', help='DEPRECATED. ' 'A logging.Formatter log message format string which may ' @@ -103,7 +88,7 @@ logging_cli_opts = [ default=_DEFAULT_LOG_DATE_FORMAT, metavar='DATE_FORMAT', help='Format string for %%(asctime)s in log records. ' - 'Default: %(default)s'), + 'Default: %(default)s .'), cfg.StrOpt('log-file', metavar='PATH', deprecated_name='logfile', @@ -112,79 +97,78 @@ logging_cli_opts = [ cfg.StrOpt('log-dir', deprecated_name='logdir', help='(Optional) The base directory used for relative ' - '--log-file paths'), + '--log-file paths.'), cfg.BoolOpt('use-syslog', default=False, help='Use syslog for logging. ' 'Existing syslog format is DEPRECATED during I, ' - 'and then will be changed in J to honor RFC5424'), + 'and will change in J to honor RFC5424.'), cfg.BoolOpt('use-syslog-rfc-format', # TODO(bogdando) remove or use True after existing # syslog format deprecation in J default=False, - help='(Optional) Use syslog rfc5424 format for logging. ' - 'If enabled, will add APP-NAME (RFC5424) before the ' - 'MSG part of the syslog message. The old format ' - 'without APP-NAME is deprecated in I, ' + help='(Optional) Enables or disables syslog rfc5424 format ' + 'for logging. If enabled, prefixes the MSG part of the ' + 'syslog message with APP-NAME (RFC5424). The ' + 'format without the APP-NAME is deprecated in I, ' 'and will be removed in J.'), cfg.StrOpt('syslog-log-facility', default='LOG_USER', - help='Syslog facility to receive log lines') + help='Syslog facility to receive log lines.') ] generic_log_opts = [ cfg.BoolOpt('use_stderr', default=True, - help='Log output to standard error') + help='Log output to standard error.') ] +DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'boto=WARN', + 'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO', + 'oslo.messaging=INFO', 'iso8601=WARN', + 'requests.packages.urllib3.connectionpool=WARN', + 'urllib3.connectionpool=WARN', 'websocket=WARN', + "keystonemiddleware=WARN", "routes.middleware=WARN", + "stevedore=WARN"] + log_opts = [ cfg.StrOpt('logging_context_format_string', default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' '%(name)s [%(request_id)s %(user_identity)s] ' '%(instance)s%(message)s', - help='Format string to use for log messages with context'), + help='Format string to use for log messages with context.'), cfg.StrOpt('logging_default_format_string', default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' '%(name)s [-] %(instance)s%(message)s', - help='Format string to use for log messages without context'), + help='Format string to use for log messages without context.'), cfg.StrOpt('logging_debug_format_suffix', default='%(funcName)s %(pathname)s:%(lineno)d', - help='Data to append to log format when level is DEBUG'), + help='Data to append to log format when level is DEBUG.'), cfg.StrOpt('logging_exception_prefix', default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' '%(instance)s', - help='Prefix each line of exception output with this format'), + help='Prefix each line of exception output with this format.'), cfg.ListOpt('default_log_levels', - default=[ - 'amqp=WARN', - 'amqplib=WARN', - 'boto=WARN', - 'qpid=WARN', - 'sqlalchemy=WARN', - 'suds=INFO', - 'iso8601=WARN', - 'requests.packages.urllib3.connectionpool=WARN' - ], - help='List of logger=LEVEL pairs'), + default=DEFAULT_LOG_LEVELS, + help='List of logger=LEVEL pairs.'), cfg.BoolOpt('publish_errors', default=False, - help='Publish error events'), + help='Enables or disables publication of error events.'), cfg.BoolOpt('fatal_deprecations', default=False, - help='Make deprecations fatal'), + help='Enables or disables fatal status of deprecations.'), # NOTE(mikal): there are two options here because sometimes we are handed # a full instance (and could include more information), and other times we # are just handed a UUID for the instance. cfg.StrOpt('instance_format', default='[instance: %(uuid)s] ', - help='If an instance is passed with the log message, format ' - 'it like this'), + help='The format for an instance that is passed with the log ' + 'message.'), cfg.StrOpt('instance_uuid_format', default='[instance: %(uuid)s] ', - help='If an instance UUID is passed with the log message, ' - 'format it like this'), + help='The format for an instance UUID that is passed with the ' + 'log message.'), ] CONF = cfg.CONF @@ -243,45 +227,20 @@ def _get_log_file_path(binary=None): return None -def mask_password(message, secret="***"): - """Replace password with 'secret' in message. - - :param message: The string which includes security information. - :param secret: value with which to replace passwords. - :returns: The unicode value of message with the password fields masked. - - For example: - - >>> mask_password("'adminPass' : 'aaaaa'") - "'adminPass' : '***'" - >>> mask_password("'admin_pass' : 'aaaaa'") - "'admin_pass' : '***'" - >>> mask_password('"password" : "aaaaa"') - '"password" : "***"' - >>> mask_password("'original_password' : 'aaaaa'") - "'original_password' : '***'" - >>> mask_password("u'original_password' : u'aaaaa'") - "u'original_password' : u'***'" - """ - message = six.text_type(message) - - # NOTE(ldbragst): Check to see if anything in message contains any key - # specified in _SANITIZE_KEYS, if not then just return the message since - # we don't have to mask any passwords. - if not any(key in message for key in _SANITIZE_KEYS): - return message - - secret = r'\g<1>' + secret + r'\g<2>' - for pattern in _SANITIZE_PATTERNS: - message = re.sub(pattern, secret, message) - return message - - class BaseLoggerAdapter(logging.LoggerAdapter): def audit(self, msg, *args, **kwargs): self.log(logging.AUDIT, msg, *args, **kwargs) + def isEnabledFor(self, level): + if _PY26: + # This method was added in python 2.7 (and it does the exact + # same logic, so we need to do the exact same logic so that + # python 2.6 has this capability as well). + return self.logger.isEnabledFor(level) + else: + return super(BaseLoggerAdapter, self).isEnabledFor(level) + class LazyAdapter(BaseLoggerAdapter): def __init__(self, name='unknown', version='unknown'): @@ -294,6 +253,11 @@ class LazyAdapter(BaseLoggerAdapter): def logger(self): if not self._logger: self._logger = getLogger(self.name, self.version) + if six.PY3: + # In Python 3, the code fails because the 'manager' attribute + # cannot be found when using a LoggerAdapter as the + # underlying logger. Work around this issue. + self._logger.manager = self._logger.logger.manager return self._logger @@ -304,25 +268,45 @@ class ContextAdapter(BaseLoggerAdapter): self.logger = logger self.project = project_name self.version = version_string + self._deprecated_messages_sent = dict() @property def handlers(self): return self.logger.handlers def deprecated(self, msg, *args, **kwargs): + """Call this method when a deprecated feature is used. + + If the system is configured for fatal deprecations then the message + is logged at the 'critical' level and :class:`DeprecatedConfig` will + be raised. + + Otherwise, the message will be logged (once) at the 'warn' level. + + :raises: :class:`DeprecatedConfig` if the system is configured for + fatal deprecations. + + """ stdmsg = _("Deprecated: %s") % msg if CONF.fatal_deprecations: self.critical(stdmsg, *args, **kwargs) raise DeprecatedConfig(msg=stdmsg) - else: - self.warn(stdmsg, *args, **kwargs) + + # Using a list because a tuple with dict can't be stored in a set. + sent_args = self._deprecated_messages_sent.setdefault(msg, list()) + + if args in sent_args: + # Already logged this message, so don't log it again. + return + + sent_args.append(args) + self.warn(stdmsg, *args, **kwargs) def process(self, msg, kwargs): - # NOTE(mrodden): catch any Message/other object and - # coerce to unicode before they can get - # to the python logging and possibly - # cause string encoding trouble - if not isinstance(msg, six.string_types): + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(msg, six.text_type): msg = six.text_type(msg) if 'extra' not in kwargs: @@ -336,7 +320,7 @@ class ContextAdapter(BaseLoggerAdapter): extra.update(_dictify_context(context)) instance = kwargs.pop('instance', None) - instance_uuid = (extra.get('instance_uuid', None) or + instance_uuid = (extra.get('instance_uuid') or kwargs.pop('instance_uuid', None)) instance_extra = '' if instance: @@ -402,9 +386,7 @@ class JSONFormatter(logging.Formatter): def _create_logging_excepthook(product_name): def logging_excepthook(exc_type, value, tb): - extra = {} - if CONF.verbose or CONF.debug: - extra['exc_info'] = (exc_type, value, tb) + extra = {'exc_info': (exc_type, value, tb)} getLogger(product_name).critical( "".join(traceback.format_exception_only(exc_type, value)), **extra) @@ -428,8 +410,8 @@ def _load_log_config(log_config_append): try: logging.config.fileConfig(log_config_append, disable_existing_loggers=False) - except moves.configparser.Error as exc: - raise LogConfigError(log_config_append, str(exc)) + except (moves.configparser.Error, KeyError) as exc: + raise LogConfigError(log_config_append, six.text_type(exc)) def setup(product_name, version='unknown'): @@ -441,10 +423,20 @@ def setup(product_name, version='unknown'): sys.excepthook = _create_logging_excepthook(product_name) -def set_defaults(logging_context_format_string): - cfg.set_defaults(log_opts, - logging_context_format_string= - logging_context_format_string) +def set_defaults(logging_context_format_string=None, + default_log_levels=None): + # Just in case the caller is not setting the + # default_log_level. This is insurance because + # we introduced the default_log_level parameter + # later in a backwards in-compatible change + if default_log_levels is not None: + cfg.set_defaults( + log_opts, + default_log_levels=default_log_levels) + if logging_context_format_string is not None: + cfg.set_defaults( + log_opts, + logging_context_format_string=logging_context_format_string) def _find_facility_from_conf(): @@ -474,10 +466,16 @@ def _find_facility_from_conf(): class RFCSysLogHandler(logging.handlers.SysLogHandler): def __init__(self, *args, **kwargs): self.binary_name = _get_binary_name() - super(RFCSysLogHandler, self).__init__(*args, **kwargs) + # Do not use super() unless type(logging.handlers.SysLogHandler) + # is 'type' (Python 2.7). + # Use old style calls, if the type is 'classobj' (Python 2.6) + logging.handlers.SysLogHandler.__init__(self, *args, **kwargs) def format(self, record): - msg = super(RFCSysLogHandler, self).format(record) + # Do not use super() unless type(logging.handlers.SysLogHandler) + # is 'type' (Python 2.7). + # Use old style calls, if the type is 'classobj' (Python 2.6) + msg = logging.handlers.SysLogHandler.format(self, record) msg = self.binary_name + ' ' + msg return msg @@ -487,18 +485,6 @@ def _setup_logging_from_conf(project, version): for handler in log_root.handlers: log_root.removeHandler(handler) - if CONF.use_syslog: - facility = _find_facility_from_conf() - # TODO(bogdando) use the format provided by RFCSysLogHandler - # after existing syslog format deprecation in J - if CONF.use_syslog_rfc_format: - syslog = RFCSysLogHandler(address='/dev/log', - facility=facility) - else: - syslog = logging.handlers.SysLogHandler(address='/dev/log', - facility=facility) - log_root.addHandler(syslog) - logpath = _get_log_file_path() if logpath: filelog = logging.handlers.WatchedFileHandler(logpath) @@ -515,9 +501,14 @@ def _setup_logging_from_conf(project, version): log_root.addHandler(streamlog) if CONF.publish_errors: - handler = importutils.import_object( - "cloudbaseinit.openstack.common.log_handler.PublishErrorsHandler", - logging.ERROR) + try: + handler = importutils.import_object( + "cloudbaseinit.openstack.common.log_handler.PublishErrorsHandler", + logging.ERROR) + except ImportError: + handler = importutils.import_object( + "oslo.messaging.notify.log_handler.PublishErrorsHandler", + logging.ERROR) log_root.addHandler(handler) datefmt = CONF.log_date_format @@ -543,9 +534,29 @@ def _setup_logging_from_conf(project, version): for pair in CONF.default_log_levels: mod, _sep, level_name = pair.partition('=') - level = logging.getLevelName(level_name) logger = logging.getLogger(mod) - logger.setLevel(level) + # NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name + # to integer code. + if sys.version_info < (2, 7): + level = logging.getLevelName(level_name) + logger.setLevel(level) + else: + logger.setLevel(level_name) + + if CONF.use_syslog: + try: + facility = _find_facility_from_conf() + # TODO(bogdando) use the format provided by RFCSysLogHandler + # after existing syslog format deprecation in J + if CONF.use_syslog_rfc_format: + syslog = RFCSysLogHandler(facility=facility) + else: + syslog = logging.handlers.SysLogHandler(facility=facility) + log_root.addHandler(syslog) + except socket.error: + log_root.error('Unable to add syslog handler. Verify that syslog' + 'is running.') + _loggers = {} @@ -615,6 +626,12 @@ class ContextFormatter(logging.Formatter): def format(self, record): """Uses contextstring if request_id is set, otherwise default.""" + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(record.msg, six.text_type): + record.msg = six.text_type(record.msg) + # store project info record.project = self.project record.version = self.version @@ -629,19 +646,24 @@ class ContextFormatter(logging.Formatter): # NOTE(sdague): default the fancier formatting params # to an empty string so we don't throw an exception if # they get used - for key in ('instance', 'color'): + for key in ('instance', 'color', 'user_identity'): if key not in record.__dict__: record.__dict__[key] = '' - if record.__dict__.get('request_id', None): - self._fmt = CONF.logging_context_format_string + if record.__dict__.get('request_id'): + fmt = CONF.logging_context_format_string else: - self._fmt = CONF.logging_default_format_string + fmt = CONF.logging_default_format_string if (record.levelno == logging.DEBUG and CONF.logging_debug_format_suffix): - self._fmt += " " + CONF.logging_debug_format_suffix + fmt += " " + CONF.logging_debug_format_suffix + if sys.version_info < (3, 2): + self._fmt = fmt + else: + self._style = logging.PercentStyle(fmt) + self._fmt = self._style._fmt # Cache this on the record, Logger will respect our formatted copy if record.exc_info: record.exc_text = self.formatException(record.exc_info, record) diff --git a/cloudbaseinit/openstack/common/loopingcall.py b/cloudbaseinit/openstack/common/loopingcall.py index b52afd66..f23e7e48 100644 --- a/cloudbaseinit/openstack/common/loopingcall.py +++ b/cloudbaseinit/openstack/common/loopingcall.py @@ -16,31 +16,36 @@ # under the License. import sys +import time from eventlet import event from eventlet import greenthread -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LE, _LW from cloudbaseinit.openstack.common import log as logging -from cloudbaseinit.openstack.common import timeutils LOG = logging.getLogger(__name__) +# NOTE(zyluo): This lambda function was declared to avoid mocking collisions +# with time.time() called in the standard logging module +# during unittests. +_ts = lambda: time.time() + class LoopingCallDone(Exception): - """Exception to break out and stop a LoopingCall. + """Exception to break out and stop a LoopingCallBase. - The poll-function passed to LoopingCall can raise this exception to + The poll-function passed to LoopingCallBase can raise this exception to break out of the loop normally. This is somewhat analogous to StopIteration. An optional return-value can be included as the argument to the exception; - this return-value will be returned by LoopingCall.wait() + this return-value will be returned by LoopingCallBase.wait() """ def __init__(self, retvalue=True): - """:param retvalue: Value that LoopingCall.wait() should return.""" + """:param retvalue: Value that LoopingCallBase.wait() should return.""" self.retvalue = retvalue @@ -72,21 +77,22 @@ class FixedIntervalLoopingCall(LoopingCallBase): try: while self._running: - start = timeutils.utcnow() + start = _ts() self.f(*self.args, **self.kw) - end = timeutils.utcnow() + end = _ts() if not self._running: break - delay = interval - timeutils.delta_seconds(start, end) - if delay <= 0: - LOG.warn(_('task run outlasted interval by %s sec') % - -delay) - greenthread.sleep(delay if delay > 0 else 0) + delay = end - start - interval + if delay > 0: + LOG.warn(_LW('task %(func_name)s run outlasted ' + 'interval by %(delay).2f sec'), + {'func_name': repr(self.f), 'delay': delay}) + greenthread.sleep(-delay if delay < 0 else 0) except LoopingCallDone as e: self.stop() done.send(e.retvalue) except Exception: - LOG.exception(_('in fixed duration looping call')) + LOG.exception(_LE('in fixed duration looping call')) done.send_exception(*sys.exc_info()) return else: @@ -98,11 +104,6 @@ class FixedIntervalLoopingCall(LoopingCallBase): return self.done -# TODO(mikal): this class name is deprecated in Havana and should be removed -# in the I release -LoopingCall = FixedIntervalLoopingCall - - class DynamicLoopingCall(LoopingCallBase): """A looping call which sleeps until the next known event. @@ -126,14 +127,15 @@ class DynamicLoopingCall(LoopingCallBase): if periodic_interval_max is not None: idle = min(idle, periodic_interval_max) - LOG.debug(_('Dynamic looping call sleeping for %.02f ' - 'seconds'), idle) + LOG.debug('Dynamic looping call %(func_name)s sleeping ' + 'for %(idle).02f seconds', + {'func_name': repr(self.f), 'idle': idle}) greenthread.sleep(idle) except LoopingCallDone as e: self.stop() done.send(e.retvalue) except Exception: - LOG.exception(_('in dynamic looping call')) + LOG.exception(_LE('in dynamic looping call')) done.send_exception(*sys.exc_info()) return else: diff --git a/cloudbaseinit/openstack/common/network_utils.py b/cloudbaseinit/openstack/common/network_utils.py index ae8ca881..45a385c9 100644 --- a/cloudbaseinit/openstack/common/network_utils.py +++ b/cloudbaseinit/openstack/common/network_utils.py @@ -17,7 +17,14 @@ Network-related utilities and helper functions. """ -from cloudbaseinit.openstack.common.py3kcompat import urlutils +import logging +import socket + +from six.moves.urllib import parse + +from cloudbaseinit.openstack.common.gettextutils import _LW + +LOG = logging.getLogger(__name__) def parse_host_port(address, default_port=None): @@ -42,8 +49,12 @@ def parse_host_port(address, default_port=None): ('::1', 1234) >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) ('2001:db8:85a3::8a2e:370:7334', 1234) - + >>> parse_host_port(None) + (None, None) """ + if not address: + return (None, None) + if address[0] == '[': # Escaped ipv6 _host, _port = address[1:].split(']') @@ -64,16 +75,89 @@ def parse_host_port(address, default_port=None): return (host, None if port is None else int(port)) +class ModifiedSplitResult(parse.SplitResult): + """Split results class for urlsplit.""" + + # NOTE(dims): The functions below are needed for Python 2.6.x. + # We can remove these when we drop support for 2.6.x. + @property + def hostname(self): + netloc = self.netloc.split('@', 1)[-1] + host, port = parse_host_port(netloc) + return host + + @property + def port(self): + netloc = self.netloc.split('@', 1)[-1] + host, port = parse_host_port(netloc) + return port + + def urlsplit(url, scheme='', allow_fragments=True): """Parse a URL using urlparse.urlsplit(), splitting query and fragments. This function papers over Python issue9374 when needed. The parameters are the same as urlparse.urlsplit. """ - scheme, netloc, path, query, fragment = urlutils.urlsplit( + scheme, netloc, path, query, fragment = parse.urlsplit( url, scheme, allow_fragments) if allow_fragments and '#' in path: path, fragment = path.split('#', 1) if '?' in path: path, query = path.split('?', 1) - return urlutils.SplitResult(scheme, netloc, path, query, fragment) + return ModifiedSplitResult(scheme, netloc, + path, query, fragment) + + +def set_tcp_keepalive(sock, tcp_keepalive=True, + tcp_keepidle=None, + tcp_keepalive_interval=None, + tcp_keepalive_count=None): + """Set values for tcp keepalive parameters + + This function configures tcp keepalive parameters if users wish to do + so. + + :param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are + not sure, this should be True, and default values will be used. + + :param tcp_keepidle: time to wait before starting to send keepalive probes + :param tcp_keepalive_interval: time between successive probes, once the + initial wait time is over + :param tcp_keepalive_count: number of probes to send before the connection + is killed + """ + + # NOTE(praneshp): Despite keepalive being a tcp concept, the level is + # still SOL_SOCKET. This is a quirk. + if isinstance(tcp_keepalive, bool): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive) + else: + raise TypeError("tcp_keepalive must be a boolean") + + if not tcp_keepalive: + return + + # These options aren't available in the OS X version of eventlet, + # Idle + Count * Interval effectively gives you the total timeout. + if tcp_keepidle is not None: + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, + tcp_keepidle) + else: + LOG.warning(_LW('tcp_keepidle not available on your system')) + if tcp_keepalive_interval is not None: + if hasattr(socket, 'TCP_KEEPINTVL'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPINTVL, + tcp_keepalive_interval) + else: + LOG.warning(_LW('tcp_keepintvl not available on your system')) + if tcp_keepalive_count is not None: + if hasattr(socket, 'TCP_KEEPCNT'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPCNT, + tcp_keepalive_count) + else: + LOG.warning(_LW('tcp_keepknt not available on your system')) diff --git a/cloudbaseinit/openstack/common/notifier/api.py b/cloudbaseinit/openstack/common/notifier/api.py index 725aef80..558b9df0 100644 --- a/cloudbaseinit/openstack/common/notifier/api.py +++ b/cloudbaseinit/openstack/common/notifier/api.py @@ -19,7 +19,7 @@ import uuid from oslo.config import cfg from cloudbaseinit.openstack.common import context -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import jsonutils from cloudbaseinit.openstack.common import log as logging @@ -36,7 +36,6 @@ notifier_opts = [ default='INFO', help='Default notification level for outgoing notifications'), cfg.StrOpt('default_publisher_id', - default=None, help='Default publisher_id for outgoing notifications'), ] @@ -142,9 +141,9 @@ def notify(context, publisher_id, event_type, priority, payload): try: driver.notify(context, msg) except Exception as e: - LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system. " - "Payload=%(payload)s") + LOG.exception(_LE("Problem '%(e)s' attempting to " + "send to notification system. " + "Payload=%(payload)s") % dict(e=e, payload=payload)) @@ -161,8 +160,8 @@ def _get_drivers(): driver = importutils.import_module(notification_driver) _drivers[notification_driver] = driver except ImportError: - LOG.exception(_("Failed to load notifier %s. " - "These notifications will not be sent.") % + LOG.exception(_LE("Failed to load notifier %s. " + "These notifications will not be sent.") % notification_driver) return _drivers.values() diff --git a/cloudbaseinit/openstack/common/notifier/proxy.py b/cloudbaseinit/openstack/common/notifier/proxy.py index 8553aa1c..01796ae4 100644 --- a/cloudbaseinit/openstack/common/notifier/proxy.py +++ b/cloudbaseinit/openstack/common/notifier/proxy.py @@ -13,11 +13,10 @@ # under the License. """ -A temporary helper which emulates cloudbaseinit.messaging.Notifier. +A temporary helper which emulates oslo.messaging.Notifier. This helper method allows us to do the tedious porting to the new Notifier API -as a standalone commit so that the commit which switches us to -cloudbaseinit.messaging +as a standalone commit so that the commit which switches us to oslo.messaging is smaller and easier to review. This file will be removed as part of that commit. """ diff --git a/cloudbaseinit/openstack/common/notifier/rpc_notifier.py b/cloudbaseinit/openstack/common/notifier/rpc_notifier.py index f01d68fc..04dcff72 100644 --- a/cloudbaseinit/openstack/common/notifier/rpc_notifier.py +++ b/cloudbaseinit/openstack/common/notifier/rpc_notifier.py @@ -16,7 +16,7 @@ from oslo.config import cfg from cloudbaseinit.openstack.common import context as req_context -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LE from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.openstack.common import rpc @@ -42,6 +42,6 @@ def notify(context, message): try: rpc.notify(context, topic, message) except Exception: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), + LOG.exception(_LE("Could not send notification to %(topic)s. " + "Payload=%(message)s"), {"topic": topic, "message": message}) diff --git a/cloudbaseinit/openstack/common/notifier/rpc_notifier2.py b/cloudbaseinit/openstack/common/notifier/rpc_notifier2.py index db9a68d2..908da9b6 100644 --- a/cloudbaseinit/openstack/common/notifier/rpc_notifier2.py +++ b/cloudbaseinit/openstack/common/notifier/rpc_notifier2.py @@ -18,7 +18,7 @@ from oslo.config import cfg from cloudbaseinit.openstack.common import context as req_context -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LE from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.openstack.common import rpc @@ -48,6 +48,6 @@ def notify(context, message): try: rpc.notify(context, topic, message, envelope=True) except Exception: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), + LOG.exception(_LE("Could not send notification to %(topic)s. " + "Payload=%(message)s"), {"topic": topic, "message": message}) diff --git a/cloudbaseinit/openstack/common/rpc/amqp.py b/cloudbaseinit/openstack/common/rpc/amqp.py index e64bdee1..700d924d 100644 --- a/cloudbaseinit/openstack/common/rpc/amqp.py +++ b/cloudbaseinit/openstack/common/rpc/amqp.py @@ -37,7 +37,7 @@ import six from cloudbaseinit.openstack.common import excutils -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE from cloudbaseinit.openstack.common import local from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.openstack.common.rpc import common as rpc_common @@ -72,7 +72,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug(_('Pool creating new connection')) + LOG.debug('Pool creating new connection') return self.connection_cls(self.conf) def empty(self): @@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection): ack_on_error) def consume_in_thread(self): - self.connection.consume_in_thread() + return self.connection.consume_in_thread() def __getattr__(self, key): """Proxy all other calls to the Connection instance.""" @@ -202,7 +202,7 @@ class ReplyProxy(ConnectionContext): LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s' ', message : %(data)s'), {'msg_id': msg_id, 'data': message_data}) - LOG.warn(_('_call_waiters: %s') % str(self._call_waiters)) + LOG.warn(_('_call_waiters: %s') % self._call_waiters) else: waiter.put(message_data) @@ -287,7 +287,7 @@ def unpack_context(conf, msg): context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) - rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) + rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict()) return ctx @@ -339,7 +339,7 @@ def _add_unique_id(msg): """Add unique_id for checking duplicate messages.""" unique_id = uuid.uuid4().hex msg.update({UNIQUE_ID: unique_id}) - LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + LOG.debug('UNIQUE_ID is %s.' % (unique_id)) class _ThreadPoolWithWait(object): @@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait): # the previous context is stored in local.store.context if hasattr(local.store, 'context'): del local.store.context - rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + rpc_common._safe_log(LOG.debug, 'received %s', message_data) self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') @@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait): # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) except rpc_common.ClientException as e: - LOG.debug(_('Expected exception during message handling (%s)') % + LOG.debug('Expected exception during message handling (%s)' % e._exc_info[1]) ctxt.reply(None, e._exc_info, connection_pool=self.connection_pool, @@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait): except Exception: # sys.exc_info() is deleted by LOG.exception(). exc_info = sys.exc_info() - LOG.error(_('Exception during message handling'), + LOG.error(_LE('Exception during message handling'), exc_info=exc_info) ctxt.reply(None, exc_info, connection_pool=self.connection_pool) @@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore() def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" - LOG.debug(_('Making synchronous call on %s ...'), topic) + LOG.debug('Making synchronous call on %s ...', topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug(_('MSG_ID is %s') % (msg_id)) + LOG.debug('MSG_ID is %s' % (msg_id)) _add_unique_id(msg) pack_context(msg, context) @@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" - LOG.debug(_('Making asynchronous cast on %s...'), topic) + LOG.debug('Making asynchronous cast on %s...', topic) _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: @@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" - LOG.debug(_('Making asynchronous fanout cast...')) + LOG.debug('Making asynchronous fanout cast...') _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: @@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" - LOG.debug(_('Sending %(event_type)s on %(topic)s'), + LOG.debug('Sending %(event_type)s on %(topic)s', dict(event_type=msg.get('event_type'), topic=topic)) _add_unique_id(msg) diff --git a/cloudbaseinit/openstack/common/rpc/common.py b/cloudbaseinit/openstack/common/rpc/common.py index 554bf6ab..061309f0 100644 --- a/cloudbaseinit/openstack/common/rpc/common.py +++ b/cloudbaseinit/openstack/common/rpc/common.py @@ -22,7 +22,7 @@ import traceback from oslo.config import cfg import six -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import jsonutils from cloudbaseinit.openstack.common import local @@ -50,7 +50,7 @@ deserialize_msg(). The current message format (version 2.0) is very simple. It is:: { - 'cloudbaseinit.version': , + 'oslo.version': , 'cloudbaseinit.message': } @@ -66,7 +66,7 @@ which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_VERSION_KEY = 'cloudbaseinit.version' +_VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'cloudbaseinit.message' _REMOTE_POSTFIX = '_Remote' @@ -85,7 +85,7 @@ class RPCException(Exception): except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs - LOG.exception(_('Exception in string format operation')) + LOG.exception(_LE('Exception in string format operation')) for name, value in six.iteritems(kwargs): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened @@ -289,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True): tb = traceback.format_exception(*failure_info) failure = failure_info[1] if log_failure: - LOG.error(_("Returning exception %s to caller"), + LOG.error(_LE("Returning exception %s to caller"), six.text_type(failure)) LOG.error(tb) diff --git a/cloudbaseinit/openstack/common/rpc/impl_fake.py b/cloudbaseinit/openstack/common/rpc/impl_fake.py index eafa84fe..4f1f1999 100644 --- a/cloudbaseinit/openstack/common/rpc/impl_fake.py +++ b/cloudbaseinit/openstack/common/rpc/impl_fake.py @@ -140,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None): if not method: return args = msg.get('args', {}) - version = msg.get('version', None) - namespace = msg.get('namespace', None) + version = msg.get('version') + namespace = msg.get('namespace') try: consumer = CONSUMERS[topic][0] @@ -185,8 +185,8 @@ def fanout_cast(conf, context, topic, msg): if not method: return args = msg.get('args', {}) - version = msg.get('version', None) - namespace = msg.get('namespace', None) + version = msg.get('version') + namespace = msg.get('namespace') for consumer in CONSUMERS.get(topic, []): try: diff --git a/cloudbaseinit/openstack/common/rpc/impl_kombu.py b/cloudbaseinit/openstack/common/rpc/impl_kombu.py index 7f44a965..79d2e3a0 100644 --- a/cloudbaseinit/openstack/common/rpc/impl_kombu.py +++ b/cloudbaseinit/openstack/common/rpc/impl_kombu.py @@ -29,7 +29,7 @@ from oslo.config import cfg import six from cloudbaseinit.openstack.common import excutils -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI from cloudbaseinit.openstack.common import network_utils from cloudbaseinit.openstack.common.rpc import amqp as rpc_amqp from cloudbaseinit.openstack.common.rpc import common as rpc_common @@ -50,8 +50,12 @@ kombu_opts = [ help='SSL cert file (valid only if SSL enabled)'), cfg.StrOpt('kombu_ssl_ca_certs', default='', - help=('SSL certification authority file ' - '(valid only if SSL enabled)')), + help='SSL certification authority file ' + '(valid only if SSL enabled)'), + cfg.FloatOpt('kombu_reconnect_delay', + default=1.0, + help='How long to wait before reconnecting in response to an ' + 'AMQP consumer cancel notification.'), cfg.StrOpt('rabbit_host', default='localhost', help='The RabbitMQ broker address where a single node is used'), @@ -153,12 +157,12 @@ class ConsumerBase(object): callback(msg) except Exception: if self.ack_on_error: - LOG.exception(_("Failed to process message" - " ... skipping it.")) + LOG.exception(_LE("Failed to process message" + " ... skipping it.")) message.ack() else: - LOG.exception(_("Failed to process message" - " ... will requeue.")) + LOG.exception(_LE("Failed to process message" + " ... will requeue.")) message.requeue() else: message.ack() @@ -458,6 +462,9 @@ class Connection(object): self.params_list = params_list + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.memory_transport = self.conf.fake_rabbit self.connection = None @@ -492,9 +499,20 @@ class Connection(object): be handled by the caller. """ if self.connection: - LOG.info(_("Reconnecting to AMQP server on " + LOG.info(_LI("Reconnecting to AMQP server on " "%(hostname)s:%(port)d") % params) try: + # XXX(nic): when reconnecting to a RabbitMQ cluster + # with mirrored queues in use, the attempt to release the + # connection can hang "indefinitely" somewhere deep down + # in Kombu. Blocking the thread for a bit prior to + # release seems to kludge around the problem where it is + # otherwise reproduceable. + if self.conf.kombu_reconnect_delay > 0: + LOG.info(_("Delaying reconnect for %1.1f seconds...") % + self.conf.kombu_reconnect_delay) + time.sleep(self.conf.kombu_reconnect_delay) + self.connection.release() except self.connection_errors: pass @@ -514,7 +532,7 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') % params) def reconnect(self): @@ -528,7 +546,7 @@ class Connection(object): attempt = 0 while True: - params = self.params_list[attempt % len(self.params_list)] + params = self.params_list[next(self.next_broker_indices)] attempt += 1 try: self._connect(params) @@ -546,7 +564,7 @@ class Connection(object): raise log_info = {} - log_info['err_str'] = str(e) + log_info['err_str'] = e log_info['max_retries'] = self.max_retries log_info.update(params) @@ -565,9 +583,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -618,8 +636,8 @@ class Connection(object): """ def _connect_error(exc): - log_info = {'topic': topic, 'err_str': str(exc)} - LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + log_info = {'topic': topic, 'err_str': exc} + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s") % log_info) def _declare_consumer(): @@ -637,12 +655,12 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.debug(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug('Timed out waiting for RPC response: %s' % + exc) raise rpc_common.Timeout() else: - LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + LOG.exception(_LE('Failed to consume message from queue: %s') % + exc) info['do_consume'] = True def _consume(): @@ -679,8 +697,8 @@ class Connection(object): """Send to a publisher based on the publisher class.""" def _error_callback(exc): - log_info = {'topic': topic, 'err_str': str(exc)} - LOG.exception(_("Failed to publish message to topic " + log_info = {'topic': topic, 'err_str': exc} + LOG.exception(_LE("Failed to publish message to topic " "'%(topic)s': %(err_str)s") % log_info) def _publish(): diff --git a/cloudbaseinit/openstack/common/rpc/impl_qpid.py b/cloudbaseinit/openstack/common/rpc/impl_qpid.py index 9cf2677b..47a7674c 100644 --- a/cloudbaseinit/openstack/common/rpc/impl_qpid.py +++ b/cloudbaseinit/openstack/common/rpc/impl_qpid.py @@ -23,7 +23,7 @@ from oslo.config import cfg import six from cloudbaseinit.openstack.common import excutils -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import jsonutils from cloudbaseinit.openstack.common import log as logging @@ -188,7 +188,7 @@ class ConsumerBase(object): msg = rpc_common.deserialize_msg(message.content) self.callback(msg) except Exception: - LOG.exception(_("Failed to process message... skipping it.")) + LOG.exception(_LE("Failed to process message... skipping it.")) finally: # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) @@ -224,7 +224,7 @@ class DirectConsumer(ConsumerBase): elif conf.qpid_topology_version == 2: node_name = "amq.direct/%s" % msg_id node_opts = {} - link_name = None + link_name = msg_id else: raise_invalid_topology_version() @@ -368,7 +368,7 @@ class DirectPublisher(Publisher): """Init a 'direct' publisher.""" if conf.qpid_topology_version == 1: - node_name = msg_id + node_name = "%s/%s" % (msg_id, msg_id) node_opts = {"type": "direct"} elif conf.qpid_topology_version == 2: node_name = "amq.direct/%s" % msg_id @@ -444,6 +444,7 @@ class Connection(object): if not qpid_messaging: raise ImportError("Failed to import qpid.messaging") + self.connection = None self.session = None self.consumers = {} self.consumer_thread = None @@ -467,7 +468,10 @@ class Connection(object): self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create(self.brokers[0]) + + brokers_count = len(self.brokers) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.reconnect() def connection_create(self, broker): @@ -494,31 +498,29 @@ class Connection(object): def reconnect(self): """Handles reconnecting and re-establishing sessions and queues.""" - attempt = 0 delay = 1 while True: # Close the session if necessary - if self.connection.opened(): + if self.connection is not None and self.connection.opened(): try: self.connection.close() - except qpid_exceptions.ConnectionError: + except qpid_exceptions.MessagingError: pass - broker = self.brokers[attempt % len(self.brokers)] - attempt += 1 + broker = self.brokers[next(self.next_broker_indices)] try: self.connection_create(broker) self.connection.open() - except qpid_exceptions.ConnectionError as e: + except qpid_exceptions.MessagingError as e: msg_dict = dict(e=e, delay=delay) - msg = _("Unable to connect to AMQP server: %(e)s. " - "Sleeping %(delay)s seconds") % msg_dict + msg = _LE("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict LOG.error(msg) time.sleep(delay) - delay = min(2 * delay, 60) + delay = min(delay + 1, 5) else: - LOG.info(_('Connected to AMQP server on %s'), broker) + LOG.info(_LI('Connected to AMQP server on %s'), broker) break self.session = self.connection.session() @@ -531,14 +533,14 @@ class Connection(object): consumer.reconnect(self.session) self._register_consumer(consumer) - LOG.debug(_("Re-established AMQP queues")) + LOG.debug("Re-established AMQP queues") def ensure(self, error_callback, method, *args, **kwargs): while True: try: return method(*args, **kwargs) except (qpid_exceptions.Empty, - qpid_exceptions.ConnectionError) as e: + qpid_exceptions.MessagingError) as e: if error_callback: error_callback(e) self.reconnect() @@ -569,8 +571,8 @@ class Connection(object): add it to our list of consumers """ def _connect_error(exc): - log_info = {'topic': topic, 'err_str': str(exc)} - LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + log_info = {'topic': topic, 'err_str': exc} + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s") % log_info) def _declare_consumer(): @@ -585,19 +587,19 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.debug(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug('Timed out waiting for RPC response: %s' % + exc) raise rpc_common.Timeout() else: - LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + LOG.exception(_LE('Failed to consume message from queue: %s') % + exc) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) try: self._lookup_consumer(nxt_receiver).consume() except Exception: - LOG.exception(_("Error processing message. Skipping it.")) + LOG.exception(_LE("Error processing message. Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: @@ -623,8 +625,8 @@ class Connection(object): """Send to a publisher based on the publisher class.""" def _connect_error(exc): - log_info = {'topic': topic, 'err_str': str(exc)} - LOG.exception(_("Failed to publish message to topic " + log_info = {'topic': topic, 'err_str': exc} + LOG.exception(_LE("Failed to publish message to topic " "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): diff --git a/cloudbaseinit/openstack/common/rpc/impl_zmq.py b/cloudbaseinit/openstack/common/rpc/impl_zmq.py index f111ff3c..783eefad 100644 --- a/cloudbaseinit/openstack/common/rpc/impl_zmq.py +++ b/cloudbaseinit/openstack/common/rpc/impl_zmq.py @@ -27,7 +27,7 @@ import six from six import moves from cloudbaseinit.openstack.common import excutils -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LE, _LI from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import jsonutils from cloudbaseinit.openstack.common.rpc import common as rpc_common @@ -63,7 +63,7 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), - cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + cfg.IntOpt('rpc_zmq_topic_backlog', help='Maximum number of ingress messages to locally buffer ' 'per topic. Default is unlimited.'), @@ -93,12 +93,12 @@ def _serialize(data): return jsonutils.dumps(data, ensure_ascii=True) except TypeError: with excutils.save_and_reraise_exception(): - LOG.error(_("JSON serialization failed.")) + LOG.error(_LE("JSON serialization failed.")) def _deserialize(data): """Deserialization wrapper.""" - LOG.debug(_("Deserializing: %s"), data) + LOG.debug("Deserializing: %s", data) return jsonutils.loads(data) @@ -133,9 +133,9 @@ class ZmqSocket(object): str_data = {'addr': addr, 'type': self.socket_s(), 'subscribe': subscribe, 'bind': bind} - LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) - LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) - LOG.debug(_("-> bind: %(bind)s"), str_data) + LOG.debug("Connecting to %(addr)s with %(type)s", str_data) + LOG.debug("-> Subscribed to %(subscribe)s", str_data) + LOG.debug("-> bind: %(bind)s", str_data) try: if bind: @@ -155,7 +155,7 @@ class ZmqSocket(object): """Subscribe.""" if not self.can_sub: raise RPCException("Cannot subscribe on this socket.") - LOG.debug(_("Subscribing to %s"), msg_filter) + LOG.debug("Subscribing to %s", msg_filter) try: self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) @@ -192,7 +192,7 @@ class ZmqSocket(object): # it would be much worse if some of the code calling this # were to fail. For now, lets log, and later evaluate # if we can safely raise here. - LOG.error(_("ZeroMQ socket could not be closed.")) + LOG.error(_LE("ZeroMQ socket could not be closed.")) self.sock = None def recv(self, **kwargs): @@ -264,7 +264,7 @@ class InternalContext(object): def _get_response(self, ctx, proxy, topic, data): """Process a curried message and cast the result to topic.""" - LOG.debug(_("Running func with context: %s"), ctx.to_dict()) + LOG.debug("Running func with context: %s", ctx.to_dict()) data.setdefault('version', None) data.setdefault('args', {}) @@ -277,13 +277,13 @@ class InternalContext(object): # ignore these since they are just from shutdowns pass except rpc_common.ClientException as e: - LOG.debug(_("Expected exception during message handling (%s)") % + LOG.debug("Expected exception during message handling (%s)" % e._exc_info[1]) return {'exc': rpc_common.serialize_remote_exception(e._exc_info, log_failure=False)} except Exception: - LOG.error(_("Exception during message handling")) + LOG.error(_LE("Exception during message handling")) return {'exc': rpc_common.serialize_remote_exception(sys.exc_info())} @@ -302,7 +302,7 @@ class InternalContext(object): self._get_response(ctx, proxy, topic, payload), ctx.replies) - LOG.debug(_("Sending reply")) + LOG.debug("Sending reply") _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { @@ -320,7 +320,7 @@ class ConsumerBase(object): @classmethod def normalize_reply(self, result, replies): - #TODO(ewindisch): re-evaluate and document this method. + # TODO(ewindisch): re-evaluate and document this method. if isinstance(result, types.GeneratorType): return list(result) elif replies: @@ -336,7 +336,7 @@ class ConsumerBase(object): # processed internally. (non-valid method name) method = data.get('method') if not method: - LOG.error(_("RPC message did not include method.")) + LOG.error(_LE("RPC message did not include method.")) return # Internal method @@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase): def register(self, proxy, in_addr, zmq_type_in, in_bind=True, subscribe=None): - LOG.info(_("Registering reactor")) + LOG.info(_LI("Registering reactor")) if zmq_type_in not in (zmq.PULL, zmq.SUB): raise RPCException("Bad input socktype") @@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase): self.proxies[inq] = proxy self.sockets.append(inq) - LOG.info(_("In reactor registered")) + LOG.info(_LI("In reactor registered")) def consume_in_thread(self): @excutils.forever_retry_uncaught_exceptions def _consume(sock): - LOG.info(_("Consuming socket")) + LOG.info(_LI("Consuming socket")) while True: self.consume(sock) @@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor): if topic not in self.topic_proxy: def publisher(waiter): - LOG.info(_("Creating proxy for topic: %s"), topic) + LOG.info(_LI("Creating proxy for topic: %s"), topic) try: # The topic is received over the network, @@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor): try: wait_sock_creation.wait() except RPCException: - LOG.error(_("Topic socket file creation failed.")) + LOG.error(_LE("Topic socket file creation failed.")) return try: self.topic_proxy[topic].put_nowait(data) except eventlet.queue.Full: - LOG.error(_("Local per-topic backlog buffer full for topic " - "%(topic)s. Dropping message.") % {'topic': topic}) + LOG.error(_LE("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) def consume_in_thread(self): """Runs the ZmqProxy service.""" @@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor): except os.error: if not os.path.isdir(ipc_dir): with excutils.save_and_reraise_exception(): - LOG.error(_("Required IPC directory does not exist at" - " %s") % (ipc_dir, )) + LOG.error(_LE("Required IPC directory does not exist at" + " %s") % (ipc_dir, )) try: self.register(consumption_proxy, consume_in, @@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor): except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): - LOG.error(_("Permission denied to IPC directory at" - " %s") % (ipc_dir, )) + LOG.error(_LE("Permission denied to IPC directory at" + " %s") % (ipc_dir, )) with excutils.save_and_reraise_exception(): - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) + LOG.error(_LE("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -539,9 +539,9 @@ class ZmqReactor(ZmqBaseReactor): super(ZmqReactor, self).__init__(conf) def consume(self, sock): - #TODO(ewindisch): use zero-copy (i.e. references, not copying) + # TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) + LOG.debug("CONSUMER RECEIVED DATA: %s", data) proxy = self.proxies[sock] @@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor): # Unmarshal only after verifying the message. ctx = RpcContext.unmarshal(data[3]) else: - LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + LOG.error(_LE("ZMQ Envelope version unsupported or unknown.")) return self.pool.spawn_n(self.process, proxy, ctx, request) @@ -588,14 +588,14 @@ class Connection(rpc_common.Connection): topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) if topic in self.topics: - LOG.info(_("Skipping topic registration. Already registered.")) + LOG.info(_LI("Skipping topic registration. Already registered.")) return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ (CONF.rpc_zmq_ipc_dir, topic) - LOG.debug(_("Consumer is a zmq.%s"), + LOG.debug("Consumer is a zmq.%s", ['PULL', 'SUB'][sock_type == zmq.SUB]) self.reactor.register(proxy, inaddr, sock_type, @@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None, # Replies always come into the reply service. reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host - LOG.debug(_("Creating payload")) + LOG.debug("Creating payload") # Curry the original request into a reply method. mcontext = RpcContext.marshal(context) payload = { @@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None, } } - LOG.debug(_("Creating queue socket for reply waiter")) + LOG.debug("Creating queue socket for reply waiter") # Messages arriving async. # TODO(ewindisch): have reply consumer with dynamic subscription mgmt @@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None, zmq.SUB, subscribe=msg_id, bind=False ) - LOG.debug(_("Sending cast")) + LOG.debug("Sending cast") _cast(addr, context, topic, payload, envelope) - LOG.debug(_("Cast sent; Waiting reply")) + LOG.debug("Cast sent; Waiting reply") # Blocks until receives reply msg = msg_waiter.recv() - LOG.debug(_("Received message: %s"), msg) - LOG.debug(_("Unpacking response")) + LOG.debug("Received message: %s", msg) + LOG.debug("Unpacking response") if msg[2] == 'cast': # Legacy version raw_msg = _deserialize(msg[-1])[-1] @@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None, Dispatches to the matchmaker and sends message to all relevant hosts. """ conf = CONF - LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) + LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))}) queues = _get_matchmaker().queues(topic) - LOG.debug(_("Sending message(s) to: %s"), queues) + LOG.debug("Sending message(s) to: %s", queues) # Don't stack if we have no matchmaker results if not queues: diff --git a/cloudbaseinit/openstack/common/rpc/matchmaker.py b/cloudbaseinit/openstack/common/rpc/matchmaker.py index 14fcbe94..07401704 100644 --- a/cloudbaseinit/openstack/common/rpc/matchmaker.py +++ b/cloudbaseinit/openstack/common/rpc/matchmaker.py @@ -22,7 +22,7 @@ import contextlib import eventlet from oslo.config import cfg -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _, _LI from cloudbaseinit.openstack.common import log as logging @@ -127,10 +127,10 @@ class MatchMakerBase(object): def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) - #NOTE(ewindisch): kept the following method in case we implement the - # underlying support. - #def add_negate_binding(self, binding, rule, last=True): - # self.bindings.append((binding, rule, True, last)) + # NOTE(ewindisch): kept the following method in case we implement the + # underlying support. + # def add_negate_binding(self, binding, rule, last=True): + # self.bindings.append((binding, rule, True, last)) def queues(self, key): workers = [] @@ -213,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase): self.hosts.discard(host) self.backend_unregister(key, '.'.join((key, host))) - LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"), + LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"), {'key': key, 'host': host}) def start_heartbeat(self): diff --git a/cloudbaseinit/openstack/common/rpc/matchmaker_redis.py b/cloudbaseinit/openstack/common/rpc/matchmaker_redis.py index abdfe42b..7939296f 100644 --- a/cloudbaseinit/openstack/common/rpc/matchmaker_redis.py +++ b/cloudbaseinit/openstack/common/rpc/matchmaker_redis.py @@ -34,7 +34,6 @@ matchmaker_redis_opts = [ default=6379, help='Use this port to connect to redis host.'), cfg.StrOpt('password', - default=None, help='Password for Redis server. (optional)'), ] diff --git a/cloudbaseinit/openstack/common/rpc/matchmaker_ring.py b/cloudbaseinit/openstack/common/rpc/matchmaker_ring.py index fbd87ddb..4def6aeb 100644 --- a/cloudbaseinit/openstack/common/rpc/matchmaker_ring.py +++ b/cloudbaseinit/openstack/common/rpc/matchmaker_ring.py @@ -22,7 +22,7 @@ import json from oslo.config import cfg -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LW from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.openstack.common.rpc import matchmaker as mm @@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange): def run(self, key): if not self._ring_has(key): LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (key, ) + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) ) return [] host = next(self.ring0[key]) @@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange): nkey = key.split('fanout~')[1:][0] if not self._ring_has(nkey): LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) ) return [] return map(lambda x: (key + '.' + x, x), self.ring[nkey]) diff --git a/cloudbaseinit/openstack/common/rpc/proxy.py b/cloudbaseinit/openstack/common/rpc/proxy.py index 2f5c8c37..8c1a5179 100644 --- a/cloudbaseinit/openstack/common/rpc/proxy.py +++ b/cloudbaseinit/openstack/common/rpc/proxy.py @@ -48,7 +48,7 @@ class RpcProxy(object): basis. :param version_cap: Optionally cap the maximum version used for sent messages. - :param serializer: Optionaly (de-)serialize entities with a + :param serializer: Optionally (de-)serialize entities with a provided helper. """ self.topic = topic diff --git a/cloudbaseinit/openstack/common/rpc/service.py b/cloudbaseinit/openstack/common/rpc/service.py index 2fe2f62c..b2b800d1 100644 --- a/cloudbaseinit/openstack/common/rpc/service.py +++ b/cloudbaseinit/openstack/common/rpc/service.py @@ -15,7 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudbaseinit.openstack.common.gettextutils import _ from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.openstack.common import rpc from cloudbaseinit.openstack.common.rpc import dispatcher as rpc_dispatcher @@ -44,7 +43,7 @@ class Service(service.Service): super(Service, self).start() self.conn = rpc.create_connection(new=True) - LOG.debug(_("Creating Consumer connection for Service %s") % + LOG.debug("Creating Consumer connection for Service %s" % self.topic) dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], diff --git a/cloudbaseinit/openstack/common/service.py b/cloudbaseinit/openstack/common/service.py index be532243..631c9a00 100644 --- a/cloudbaseinit/openstack/common/service.py +++ b/cloudbaseinit/openstack/common/service.py @@ -38,9 +38,10 @@ from eventlet import event from oslo.config import cfg from cloudbaseinit.openstack.common import eventlet_backdoor -from cloudbaseinit.openstack.common.gettextutils import _ +from cloudbaseinit.openstack.common.gettextutils import _LE, _LI, _LW from cloudbaseinit.openstack.common import importutils from cloudbaseinit.openstack.common import log as logging +from cloudbaseinit.openstack.common import systemd from cloudbaseinit.openstack.common import threadgroup @@ -163,7 +164,7 @@ class ServiceLauncher(Launcher): status = None signo = 0 - LOG.debug(_('Full set of CONF:')) + LOG.debug('Full set of CONF:') CONF.log_opt_values(LOG, std_logging.DEBUG) try: @@ -172,7 +173,7 @@ class ServiceLauncher(Launcher): super(ServiceLauncher, self).wait() except SignalExit as exc: signame = _signo_to_signame(exc.signo) - LOG.info(_('Caught %s, exiting'), signame) + LOG.info(_LI('Caught %s, exiting'), signame) status = exc.code signo = exc.signo except SystemExit as exc: @@ -184,11 +185,12 @@ class ServiceLauncher(Launcher): rpc.cleanup() except Exception: # We're shutting down, so it doesn't matter at this point. - LOG.exception(_('Exception during rpc cleanup.')) + LOG.exception(_LE('Exception during rpc cleanup.')) return status, signo def wait(self, ready_callback=None): + systemd.notify_once() while True: self.handle_signal() status, signo = self._wait_for_exit_or_signal(ready_callback) @@ -235,7 +237,7 @@ class ProcessLauncher(object): # dies unexpectedly self.readpipe.read() - LOG.info(_('Parent process has died unexpectedly, exiting')) + LOG.info(_LI('Parent process has died unexpectedly, exiting')) sys.exit(1) @@ -266,13 +268,13 @@ class ProcessLauncher(object): launcher.wait() except SignalExit as exc: signame = _signo_to_signame(exc.signo) - LOG.info(_('Caught %s, exiting'), signame) + LOG.info(_LI('Child caught %s, exiting'), signame) status = exc.code signo = exc.signo except SystemExit as exc: status = exc.code except BaseException: - LOG.exception(_('Unhandled exception')) + LOG.exception(_LE('Unhandled exception')) status = 2 finally: launcher.stop() @@ -305,7 +307,7 @@ class ProcessLauncher(object): # start up quickly but ensure we don't fork off children that # die instantly too quickly. if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_('Forking too fast, sleeping')) + LOG.info(_LI('Forking too fast, sleeping')) time.sleep(1) wrap.forktimes.pop(0) @@ -324,7 +326,7 @@ class ProcessLauncher(object): os._exit(status) - LOG.info(_('Started child %d'), pid) + LOG.info(_LI('Started child %d'), pid) wrap.children.add(pid) self.children[pid] = wrap @@ -334,7 +336,7 @@ class ProcessLauncher(object): def launch_service(self, service, workers=1): wrap = ServiceWrapper(service, workers) - LOG.info(_('Starting %d workers'), wrap.workers) + LOG.info(_LI('Starting %d workers'), wrap.workers) while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap) @@ -351,15 +353,15 @@ class ProcessLauncher(object): if os.WIFSIGNALED(status): sig = os.WTERMSIG(status) - LOG.info(_('Child %(pid)d killed by signal %(sig)d'), + LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), dict(pid=pid, sig=sig)) else: code = os.WEXITSTATUS(status) - LOG.info(_('Child %(pid)s exited with status %(code)d'), + LOG.info(_LI('Child %(pid)s exited with status %(code)d'), dict(pid=pid, code=code)) if pid not in self.children: - LOG.warning(_('pid %d not in child list'), pid) + LOG.warning(_LW('pid %d not in child list'), pid) return None wrap = self.children.pop(pid) @@ -381,23 +383,35 @@ class ProcessLauncher(object): def wait(self): """Loop waiting on children to die and respawning as necessary.""" - LOG.debug(_('Full set of CONF:')) + systemd.notify_once() + LOG.debug('Full set of CONF:') CONF.log_opt_values(LOG, std_logging.DEBUG) - while True: - self.handle_signal() - self._respawn_children() - if self.sigcaught: + try: + while True: + self.handle_signal() + self._respawn_children() + # No signal means that stop was called. Don't clean up here. + if not self.sigcaught: + return + signame = _signo_to_signame(self.sigcaught) - LOG.info(_('Caught %s, stopping children'), signame) - if not _is_sighup_and_daemon(self.sigcaught): - break + LOG.info(_LI('Caught %s, stopping children'), signame) + if not _is_sighup_and_daemon(self.sigcaught): + break - for pid in self.children: - os.kill(pid, signal.SIGHUP) - self.running = True - self.sigcaught = None + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None + except eventlet.greenlet.GreenletExit: + LOG.info(_LI("Wait called after thread killed. Cleaning up.")) + self.stop() + + def stop(self): + """Terminate child processes and wait on each.""" + self.running = False for pid in self.children: try: os.kill(pid, signal.SIGTERM) @@ -407,7 +421,7 @@ class ProcessLauncher(object): # Wait for children to die if self.children: - LOG.info(_('Waiting on %d children to exit'), len(self.children)) + LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) while self.children: self._wait_child() diff --git a/cloudbaseinit/openstack/common/sslutils.py b/cloudbaseinit/openstack/common/sslutils.py index 61de6d91..6d392a32 100644 --- a/cloudbaseinit/openstack/common/sslutils.py +++ b/cloudbaseinit/openstack/common/sslutils.py @@ -22,17 +22,14 @@ from cloudbaseinit.openstack.common.gettextutils import _ ssl_opts = [ cfg.StrOpt('ca_file', - default=None, help="CA certificate file to use to verify " - "connecting clients"), + "connecting clients."), cfg.StrOpt('cert_file', - default=None, help="Certificate file to use when starting " - "the server securely"), + "the server securely."), cfg.StrOpt('key_file', - default=None, help="Private key file to use when starting " - "the server securely"), + "the server securely."), ] diff --git a/cloudbaseinit/openstack/common/strutils.py b/cloudbaseinit/openstack/common/strutils.py new file mode 100644 index 00000000..ae6dc021 --- /dev/null +++ b/cloudbaseinit/openstack/common/strutils.py @@ -0,0 +1,311 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import math +import re +import sys +import unicodedata + +import six + +from cloudbaseinit.openstack.common.gettextutils import _ + + +UNIT_PREFIX_EXPONENT = { + 'k': 1, + 'K': 1, + 'Ki': 1, + 'M': 2, + 'Mi': 2, + 'G': 3, + 'Gi': 3, + 'T': 4, + 'Ti': 4, +} +UNIT_SYSTEM_INFO = { + 'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')), + 'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')), +} + +TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes') +FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no') + +SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]") +SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+") + + +# NOTE(flaper87): The following globals are used by `mask_password` +_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password'] + +# NOTE(ldbragst): Let's build a list of regex objects using the list of +# _SANITIZE_KEYS we already have. This way, we only have to add the new key +# to the list of _SANITIZE_KEYS and we can generate regular expressions +# for XML and JSON automatically. +_SANITIZE_PATTERNS_2 = [] +_SANITIZE_PATTERNS_1 = [] + +# NOTE(amrith): Some regular expressions have only one parameter, some +# have two parameters. Use different lists of patterns here. +_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+'] +_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', + r'(%(key)s\s+[\"\']).*?([\"\'])', + r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)', + r'(<%(key)s>).*?()', + r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', + r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])', + r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?' + '[\'"]).*?([\'"])', + r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)'] + +for key in _SANITIZE_KEYS: + for pattern in _FORMAT_PATTERNS_2: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS_2.append(reg_ex) + + for pattern in _FORMAT_PATTERNS_1: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS_1.append(reg_ex) + + +def int_from_bool_as_string(subject): + """Interpret a string as a boolean and return either 1 or 0. + + Any string value in: + + ('True', 'true', 'On', 'on', '1') + + is interpreted as a boolean True. + + Useful for JSON-decoded stuff and config file parsing + """ + return bool_from_string(subject) and 1 or 0 + + +def bool_from_string(subject, strict=False, default=False): + """Interpret a string as a boolean. + + A case-insensitive match is performed such that strings matching 't', + 'true', 'on', 'y', 'yes', or '1' are considered True and, when + `strict=False`, anything else returns the value specified by 'default'. + + Useful for JSON-decoded stuff and config file parsing. + + If `strict=True`, unrecognized values, including None, will raise a + ValueError which is useful when parsing values passed in from an API call. + Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'. + """ + if not isinstance(subject, six.string_types): + subject = six.text_type(subject) + + lowered = subject.strip().lower() + + if lowered in TRUE_STRINGS: + return True + elif lowered in FALSE_STRINGS: + return False + elif strict: + acceptable = ', '.join( + "'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS)) + msg = _("Unrecognized value '%(val)s', acceptable values are:" + " %(acceptable)s") % {'val': subject, + 'acceptable': acceptable} + raise ValueError(msg) + else: + return default + + +def safe_decode(text, incoming=None, errors='strict'): + """Decodes incoming text/bytes string using `incoming` if they're not + already unicode. + + :param incoming: Text's current encoding + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: text or a unicode `incoming` encoded + representation of it. + :raises TypeError: If text is not an instance of str + """ + if not isinstance(text, (six.string_types, six.binary_type)): + raise TypeError("%s can't be decoded" % type(text)) + + if isinstance(text, six.text_type): + return text + + if not incoming: + incoming = (sys.stdin.encoding or + sys.getdefaultencoding()) + + try: + return text.decode(incoming, errors) + except UnicodeDecodeError: + # Note(flaper87) If we get here, it means that + # sys.stdin.encoding / sys.getdefaultencoding + # didn't return a suitable encoding to decode + # text. This happens mostly when global LANG + # var is not set correctly and there's no + # default encoding. In this case, most likely + # python will use ASCII or ANSI encoders as + # default encodings but they won't be capable + # of decoding non-ASCII characters. + # + # Also, UTF-8 is being used since it's an ASCII + # extension. + return text.decode('utf-8', errors) + + +def safe_encode(text, incoming=None, + encoding='utf-8', errors='strict'): + """Encodes incoming text/bytes string using `encoding`. + + If incoming is not specified, text is expected to be encoded with + current python's default encoding. (`sys.getdefaultencoding`) + + :param incoming: Text's current encoding + :param encoding: Expected encoding for text (Default UTF-8) + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: text or a bytestring `encoding` encoded + representation of it. + :raises TypeError: If text is not an instance of str + """ + if not isinstance(text, (six.string_types, six.binary_type)): + raise TypeError("%s can't be encoded" % type(text)) + + if not incoming: + incoming = (sys.stdin.encoding or + sys.getdefaultencoding()) + + if isinstance(text, six.text_type): + return text.encode(encoding, errors) + elif text and encoding != incoming: + # Decode text before encoding it with `encoding` + text = safe_decode(text, incoming, errors) + return text.encode(encoding, errors) + else: + return text + + +def string_to_bytes(text, unit_system='IEC', return_int=False): + """Converts a string into an float representation of bytes. + + The units supported for IEC :: + + Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it) + KB, KiB, MB, MiB, GB, GiB, TB, TiB + + The units supported for SI :: + + kb(it), Mb(it), Gb(it), Tb(it) + kB, MB, GB, TB + + Note that the SI unit system does not support capital letter 'K' + + :param text: String input for bytes size conversion. + :param unit_system: Unit system for byte size conversion. + :param return_int: If True, returns integer representation of text + in bytes. (default: decimal) + :returns: Numerical representation of text in bytes. + :raises ValueError: If text has an invalid value. + + """ + try: + base, reg_ex = UNIT_SYSTEM_INFO[unit_system] + except KeyError: + msg = _('Invalid unit system: "%s"') % unit_system + raise ValueError(msg) + match = reg_ex.match(text) + if match: + magnitude = float(match.group(1)) + unit_prefix = match.group(2) + if match.group(3) in ['b', 'bit']: + magnitude /= 8 + else: + msg = _('Invalid string format: %s') % text + raise ValueError(msg) + if not unit_prefix: + res = magnitude + else: + res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix]) + if return_int: + return int(math.ceil(res)) + return res + + +def to_slug(value, incoming=None, errors="strict"): + """Normalize string. + + Convert to lowercase, remove non-word characters, and convert spaces + to hyphens. + + Inspired by Django's `slugify` filter. + + :param value: Text to slugify + :param incoming: Text's current encoding + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: slugified unicode representation of `value` + :raises TypeError: If text is not an instance of str + """ + value = safe_decode(value, incoming, errors) + # NOTE(aababilov): no need to use safe_(encode|decode) here: + # encodings are always "ascii", error handling is always "ignore" + # and types are always known (first: unicode; second: str) + value = unicodedata.normalize("NFKD", value).encode( + "ascii", "ignore").decode("ascii") + value = SLUGIFY_STRIP_RE.sub("", value).strip().lower() + return SLUGIFY_HYPHENATE_RE.sub("-", value) + + +def mask_password(message, secret="***"): + """Replace password with 'secret' in message. + + :param message: The string which includes security information. + :param secret: value with which to replace passwords. + :returns: The unicode value of message with the password fields masked. + + For example: + + >>> mask_password("'adminPass' : 'aaaaa'") + "'adminPass' : '***'" + >>> mask_password("'admin_pass' : 'aaaaa'") + "'admin_pass' : '***'" + >>> mask_password('"password" : "aaaaa"') + '"password" : "***"' + >>> mask_password("'original_password' : 'aaaaa'") + "'original_password' : '***'" + >>> mask_password("u'original_password' : u'aaaaa'") + "u'original_password' : u'***'" + """ + message = six.text_type(message) + + # NOTE(ldbragst): Check to see if anything in message contains any key + # specified in _SANITIZE_KEYS, if not then just return the message since + # we don't have to mask any passwords. + if not any(key in message for key in _SANITIZE_KEYS): + return message + + substitute = r'\g<1>' + secret + r'\g<2>' + for pattern in _SANITIZE_PATTERNS_2: + message = re.sub(pattern, substitute, message) + + substitute = r'\g<1>' + secret + for pattern in _SANITIZE_PATTERNS_1: + message = re.sub(pattern, substitute, message) + + return message diff --git a/cloudbaseinit/openstack/common/systemd.py b/cloudbaseinit/openstack/common/systemd.py new file mode 100644 index 00000000..3b984cc9 --- /dev/null +++ b/cloudbaseinit/openstack/common/systemd.py @@ -0,0 +1,106 @@ +# Copyright 2012-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper module for systemd service readiness notification. +""" + +import os +import socket +import sys + +from cloudbaseinit.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def _abstractify(socket_name): + if socket_name.startswith('@'): + # abstract namespace socket + socket_name = '\0%s' % socket_name[1:] + return socket_name + + +def _sd_notify(unset_env, msg): + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.connect(_abstractify(notify_socket)) + sock.sendall(msg) + if unset_env: + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + LOG.debug("Systemd notification failed", exc_info=True) + finally: + sock.close() + + +def notify(): + """Send notification to Systemd that service is ready. + + For details see + http://www.freedesktop.org/software/systemd/man/sd_notify.html + """ + _sd_notify(False, 'READY=1') + + +def notify_once(): + """Send notification once to Systemd that service is ready. + + Systemd sets NOTIFY_SOCKET environment variable with the name of the + socket listening for notifications from services. + This method removes the NOTIFY_SOCKET environment variable to ensure + notification is sent only once. + """ + _sd_notify(True, 'READY=1') + + +def onready(notify_socket, timeout): + """Wait for systemd style notification on the socket. + + :param notify_socket: local socket address + :type notify_socket: string + :param timeout: socket timeout + :type timeout: float + :returns: 0 service ready + 1 service not ready + 2 timeout occurred + """ + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.settimeout(timeout) + sock.bind(_abstractify(notify_socket)) + try: + msg = sock.recv(512) + except socket.timeout: + return 2 + finally: + sock.close() + if 'READY=1' in msg: + return 0 + else: + return 1 + + +if __name__ == '__main__': + # simple CLI for testing + if len(sys.argv) == 1: + notify() + elif len(sys.argv) >= 2: + timeout = float(sys.argv[1]) + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + retval = onready(notify_socket, timeout) + sys.exit(retval) diff --git a/cloudbaseinit/openstack/common/threadgroup.py b/cloudbaseinit/openstack/common/threadgroup.py index b15219c3..d082f8a6 100644 --- a/cloudbaseinit/openstack/common/threadgroup.py +++ b/cloudbaseinit/openstack/common/threadgroup.py @@ -85,7 +85,7 @@ class ThreadGroup(object): def thread_done(self, thread): self.threads.remove(thread) - def stop(self): + def _stop_threads(self): current = threading.current_thread() # Iterate over a copy of self.threads so thread_done doesn't @@ -99,6 +99,7 @@ class ThreadGroup(object): except Exception as ex: LOG.exception(ex) + def stop_timers(self): for x in self.timers: try: x.stop() @@ -106,6 +107,23 @@ class ThreadGroup(object): LOG.exception(ex) self.timers = [] + def stop(self, graceful=False): + """stop function has the option of graceful=True/False. + + * In case of graceful=True, wait for all threads to be finished. + Never kill threads. + * In case of graceful=False, kill threads immediately. + """ + self.stop_timers() + if graceful: + # In case of graceful=True, wait for all threads to be + # finished, never kill threads + self.wait() + else: + # In case of graceful=False(Default), kill threads + # immediately + self._stop_threads() + def wait(self): for x in self.timers: try: diff --git a/cloudbaseinit/openstack/common/timeutils.py b/cloudbaseinit/openstack/common/timeutils.py index 52688a02..c48da95f 100644 --- a/cloudbaseinit/openstack/common/timeutils.py +++ b/cloudbaseinit/openstack/common/timeutils.py @@ -114,7 +114,7 @@ def utcnow(): def iso8601_from_timestamp(timestamp): - """Returns a iso8601 formatted date from timestamp.""" + """Returns an iso8601 formatted date from timestamp.""" return isotime(datetime.datetime.utcfromtimestamp(timestamp)) @@ -134,7 +134,7 @@ def set_time_override(override_time=None): def advance_time_delta(timedelta): """Advance overridden time using a datetime.timedelta.""" - assert(not utcnow.override_time is None) + assert utcnow.override_time is not None try: for dt in utcnow.override_time: dt += timedelta diff --git a/cloudbaseinit/openstack/common/versionutils.py b/cloudbaseinit/openstack/common/versionutils.py index 63372d59..4fff5f05 100644 --- a/cloudbaseinit/openstack/common/versionutils.py +++ b/cloudbaseinit/openstack/common/versionutils.py @@ -18,7 +18,10 @@ Helpers for comparing version strings. """ import functools +import inspect + import pkg_resources +import six from cloudbaseinit.openstack.common.gettextutils import _ from cloudbaseinit.openstack.common import log as logging @@ -52,18 +55,36 @@ class deprecated(object): >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1) ... def c(): pass + 4. Specifying the deprecated functionality will not be removed: + >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=0) + ... def d(): pass + + 5. Specifying a replacement, deprecated functionality will not be removed: + >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()', remove_in=0) + ... def e(): pass + """ + # NOTE(morganfainberg): Bexar is used for unit test purposes, it is + # expected we maintain a gap between Bexar and Folsom in this list. + BEXAR = 'B' FOLSOM = 'F' GRIZZLY = 'G' HAVANA = 'H' ICEHOUSE = 'I' + JUNO = 'J' + KILO = 'K' _RELEASES = { + # NOTE(morganfainberg): Bexar is used for unit test purposes, it is + # expected we maintain a gap between Bexar and Folsom in this list. + 'B': 'Bexar', 'F': 'Folsom', 'G': 'Grizzly', 'H': 'Havana', 'I': 'Icehouse', + 'J': 'Juno', + 'K': 'Kilo', } _deprecated_msg_with_alternative = _( @@ -74,6 +95,12 @@ class deprecated(object): '%(what)s is deprecated as of %(as_of)s and may be ' 'removed in %(remove_in)s. It will not be superseded.') + _deprecated_msg_with_alternative_no_removal = _( + '%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s.') + + _deprecated_msg_with_no_alternative_no_removal = _( + '%(what)s is deprecated as of %(as_of)s. It will not be superseded.') + def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None): """Initialize decorator @@ -91,16 +118,34 @@ class deprecated(object): self.remove_in = remove_in self.what = what - def __call__(self, func): + def __call__(self, func_or_cls): if not self.what: - self.what = func.__name__ + '()' + self.what = func_or_cls.__name__ + '()' + msg, details = self._build_message() - @functools.wraps(func) - def wrapped(*args, **kwargs): - msg, details = self._build_message() - LOG.deprecated(msg, details) - return func(*args, **kwargs) - return wrapped + if inspect.isfunction(func_or_cls): + + @six.wraps(func_or_cls) + def wrapped(*args, **kwargs): + LOG.deprecated(msg, details) + return func_or_cls(*args, **kwargs) + return wrapped + elif inspect.isclass(func_or_cls): + orig_init = func_or_cls.__init__ + + # TODO(tsufiev): change `functools` module to `six` as + # soon as six 1.7.4 (with fix for passing `assigned` + # argument to underlying `functools.wraps`) is released + # and added to the cloudbaseinit-incubator requrements + @functools.wraps(orig_init, assigned=('__name__', '__doc__')) + def new_init(self, *args, **kwargs): + LOG.deprecated(msg, details) + orig_init(self, *args, **kwargs) + func_or_cls.__init__ = new_init + return func_or_cls + else: + raise TypeError('deprecated can be used only with functions or ' + 'classes') def _get_safe_to_remove_release(self, release): # TODO(dstanek): this method will have to be reimplemented once @@ -119,9 +164,19 @@ class deprecated(object): if self.in_favor_of: details['in_favor_of'] = self.in_favor_of - msg = self._deprecated_msg_with_alternative + if self.remove_in > 0: + msg = self._deprecated_msg_with_alternative + else: + # There are no plans to remove this function, but it is + # now deprecated. + msg = self._deprecated_msg_with_alternative_no_removal else: - msg = self._deprecated_msg_no_alternative + if self.remove_in > 0: + msg = self._deprecated_msg_no_alternative + else: + # There are no plans to remove this function, but it is + # now deprecated. + msg = self._deprecated_msg_with_no_alternative_no_removal return msg, details