From fbf4a56efdf719f26c073526a3551c1158060f88 Mon Sep 17 00:00:00 2001 From: Craig Vyvial Date: Tue, 27 Aug 2013 13:33:00 -0500 Subject: [PATCH] oslo-incubator rpc update * updated rpc oslo code Addresses blueprint trove/update-oslo Change-Id: I65cfbc860b139638d460dca47ce114985ddd658c --- requirements.txt | 1 + trove/common/rpc.py | 2 +- trove/openstack/common/crypto/__init__.py | 0 trove/openstack/common/crypto/utils.py | 179 ++++++ trove/openstack/common/eventlet_backdoor.py | 18 +- trove/openstack/common/excutils.py | 13 +- trove/openstack/common/gettextutils.py | 185 +++++-- trove/openstack/common/jsonutils.py | 17 +- trove/openstack/common/local.py | 13 +- trove/openstack/common/log.py | 9 +- trove/openstack/common/loopingcall.py | 2 +- trove/openstack/common/rpc/__init__.py | 5 +- trove/openstack/common/rpc/amqp.py | 25 +- trove/openstack/common/rpc/common.py | 22 +- trove/openstack/common/rpc/impl_fake.py | 2 +- trove/openstack/common/rpc/impl_kombu.py | 52 +- trove/openstack/common/rpc/impl_qpid.py | 32 +- trove/openstack/common/rpc/impl_zmq.py | 3 +- trove/openstack/common/rpc/matchmaker.py | 14 +- trove/openstack/common/rpc/matchmaker_ring.py | 6 +- trove/openstack/common/rpc/proxy.py | 2 +- trove/openstack/common/rpc/securemessage.py | 521 ++++++++++++++++++ trove/openstack/common/rpc/service.py | 8 +- trove/openstack/common/rpc/zmq_receiver.py | 1 - trove/openstack/common/service.py | 155 ++++-- trove/openstack/common/sslutils.py | 22 +- trove/openstack/common/threadgroup.py | 6 +- trove/openstack/common/timeutils.py | 4 +- 28 files changed, 1118 insertions(+), 201 deletions(-) create mode 100644 trove/openstack/common/crypto/__init__.py create mode 100644 trove/openstack/common/crypto/utils.py create mode 100644 trove/openstack/common/rpc/securemessage.py mode change 100755 => 100644 trove/openstack/common/rpc/zmq_receiver.py diff --git a/requirements.txt b/requirements.txt index 7c1faa6986..21558f5e2e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,4 @@ pexpect -f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3 oslo.config>=1.2.0a3 mysql-python +Babel>=0.9.6 diff --git a/trove/common/rpc.py b/trove/common/rpc.py index 2fb1cfaa7f..92be89a14a 100644 --- a/trove/common/rpc.py +++ b/trove/common/rpc.py @@ -34,7 +34,7 @@ def delete_queue(context, topic): if CONF.rpc_backend == "trove.openstack.common.rpc.impl_kombu": connection = openstack_rpc.create_connection() channel = connection.channel - durable = connection.conf.rabbit_durable_queues + durable = connection.conf.amqp_durable_queues queue = kombu.entity.Queue(name=topic, channel=channel, auto_delete=False, exclusive=False, durable=durable) diff --git a/trove/openstack/common/crypto/__init__.py b/trove/openstack/common/crypto/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/openstack/common/crypto/utils.py b/trove/openstack/common/crypto/utils.py new file mode 100644 index 0000000000..be69582618 --- /dev/null +++ b/trove/openstack/common/crypto/utils.py @@ -0,0 +1,179 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 + +from Crypto.Hash import HMAC +from Crypto import Random + +from trove.openstack.common.gettextutils import _ # noqa +from trove.openstack.common import importutils + + +class CryptoutilsException(Exception): + """Generic Exception for Crypto utilities.""" + + message = _("An unknown error occurred in crypto utils.") + + +class CipherBlockLengthTooBig(CryptoutilsException): + """The block size is too big.""" + + def __init__(self, requested, permitted): + msg = _("Block size of %(given)d is too big, max = %(maximum)d") + message = msg % {'given': requested, 'maximum': permitted} + super(CryptoutilsException, self).__init__(message) + + +class HKDFOutputLengthTooLong(CryptoutilsException): + """The amount of Key Material asked is too much.""" + + def __init__(self, requested, permitted): + msg = _("Length of %(given)d is too long, max = %(maximum)d") + message = msg % {'given': requested, 'maximum': permitted} + super(CryptoutilsException, self).__init__(message) + + +class HKDF(object): + """An HMAC-based Key Derivation Function implementation (RFC5869) + + This class creates an object that allows to use HKDF to derive keys. + """ + + def __init__(self, hashtype='SHA256'): + self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype) + self.max_okm_length = 255 * self.hashfn.digest_size + + def extract(self, ikm, salt=None): + """An extract function that can be used to derive a robust key given + weak Input Key Material (IKM) which could be a password. + Returns a pseudorandom key (of HashLen octets) + + :param ikm: input keying material (ex a password) + :param salt: optional salt value (a non-secret random value) + """ + if salt is None: + salt = '\x00' * self.hashfn.digest_size + + return HMAC.new(salt, ikm, self.hashfn).digest() + + def expand(self, prk, info, length): + """An expand function that will return arbitrary length output that can + be used as keys. + Returns a buffer usable as key material. + + :param prk: a pseudorandom key of at least HashLen octets + :param info: optional string (can be a zero-length string) + :param length: length of output keying material (<= 255 * HashLen) + """ + if length > self.max_okm_length: + raise HKDFOutputLengthTooLong(length, self.max_okm_length) + + N = (length + self.hashfn.digest_size - 1) / self.hashfn.digest_size + + okm = "" + tmp = "" + for block in range(1, N + 1): + tmp = HMAC.new(prk, tmp + info + chr(block), self.hashfn).digest() + okm += tmp + + return okm[:length] + + +MAX_CB_SIZE = 256 + + +class SymmetricCrypto(object): + """Symmetric Key Crypto object. + + This class creates a Symmetric Key Crypto object that can be used + to encrypt, decrypt, or sign arbitrary data. + + :param enctype: Encryption Cipher name (default: AES) + :param hashtype: Hash/HMAC type name (default: SHA256) + """ + + def __init__(self, enctype='AES', hashtype='SHA256'): + self.cipher = importutils.import_module('Crypto.Cipher.' + enctype) + self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype) + + def new_key(self, size): + return Random.new().read(size) + + def encrypt(self, key, msg, b64encode=True): + """Encrypt the provided msg and returns the cyphertext optionally + base64 encoded. + + Uses AES-128-CBC with a Random IV by default. + + The plaintext is padded to reach blocksize length. + The last byte of the block is the length of the padding. + The length of the padding does not include the length byte itself. + + :param key: The Encryption key. + :param msg: the plain text. + + :returns encblock: a block of encrypted data. + """ + iv = Random.new().read(self.cipher.block_size) + cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv) + + # CBC mode requires a fixed block size. Append padding and length of + # padding. + if self.cipher.block_size > MAX_CB_SIZE: + raise CipherBlockLengthTooBig(self.cipher.block_size, MAX_CB_SIZE) + r = len(msg) % self.cipher.block_size + padlen = self.cipher.block_size - r - 1 + msg += '\x00' * padlen + msg += chr(padlen) + + enc = iv + cipher.encrypt(msg) + if b64encode: + enc = base64.b64encode(enc) + return enc + + def decrypt(self, key, msg, b64decode=True): + """Decrypts the provided ciphertext, optionally base 64 encoded, and + returns the plaintext message, after padding is removed. + + Uses AES-128-CBC with an IV by default. + + :param key: The Encryption key. + :param msg: the ciphetext, the first block is the IV + """ + if b64decode: + msg = base64.b64decode(msg) + iv = msg[:self.cipher.block_size] + cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv) + + padded = cipher.decrypt(msg[self.cipher.block_size:]) + l = ord(padded[-1]) + 1 + plain = padded[:-l] + return plain + + def sign(self, key, msg, b64encode=True): + """Signs a message string and returns a base64 encoded signature. + + Uses HMAC-SHA-256 by default. + + :param key: The Signing key. + :param msg: the message to sign. + """ + h = HMAC.new(key, msg, self.hashfn) + out = h.digest() + if b64encode: + out = base64.b64encode(out) + return out diff --git a/trove/openstack/common/eventlet_backdoor.py b/trove/openstack/common/eventlet_backdoor.py index 1223d532be..b3f55fa3c9 100644 --- a/trove/openstack/common/eventlet_backdoor.py +++ b/trove/openstack/common/eventlet_backdoor.py @@ -31,20 +31,20 @@ import eventlet.backdoor import greenlet from oslo.config import cfg -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import log as logging -help_for_backdoor_port = 'Acceptable ' + \ - 'values are 0, and :, where 0 results in ' + \ - 'listening on a random tcp port number, results in ' + \ - 'listening on the specified port number and not enabling backdoor' + \ - 'if it is in use and : results in listening on the ' + \ - 'smallest unused port number within the specified range of port ' + \ - 'numbers. The chosen port is displayed in the service\'s log file.' +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ cfg.StrOpt('backdoor_port', default=None, - help='Enable eventlet backdoor. %s' % help_for_backdoor_port) + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] CONF = cfg.CONF diff --git a/trove/openstack/common/excutils.py b/trove/openstack/common/excutils.py index 525201fced..7cad0a1741 100644 --- a/trove/openstack/common/excutils.py +++ b/trove/openstack/common/excutils.py @@ -24,7 +24,9 @@ import sys import time import traceback -from trove.openstack.common.gettextutils import _ +import six + +from trove.openstack.common.gettextutils import _ # noqa class save_and_reraise_exception(object): @@ -65,7 +67,7 @@ class save_and_reraise_exception(object): self.tb)) return False if self.reraise: - raise self.type_, self.value, self.tb + six.reraise(self.type_, self.value, self.tb) def forever_retry_uncaught_exceptions(infunc): @@ -77,7 +79,8 @@ def forever_retry_uncaught_exceptions(infunc): try: return infunc(*args, **kwargs) except Exception as exc: - if exc.message == last_exc_message: + this_exc_message = unicode(exc) + if this_exc_message == last_exc_message: exc_count += 1 else: exc_count = 1 @@ -85,12 +88,12 @@ def forever_retry_uncaught_exceptions(infunc): # the exception message changes cur_time = int(time.time()) if (cur_time - last_log_time > 60 or - exc.message != last_exc_message): + this_exc_message != last_exc_message): logging.exception( _('Unexpected exception occurred %d time(s)... ' 'retrying.') % exc_count) last_log_time = cur_time - last_exc_message = exc.message + last_exc_message = this_exc_message exc_count = 0 # This should be a very rare event. In case it isn't, do # a sleep. diff --git a/trove/openstack/common/gettextutils.py b/trove/openstack/common/gettextutils.py index b1794553f6..30e39dd03a 100644 --- a/trove/openstack/common/gettextutils.py +++ b/trove/openstack/common/gettextutils.py @@ -1,8 +1,8 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 Red Hat, Inc. -# All Rights Reserved. # Copyright 2013 IBM Corp. +# All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -26,19 +26,44 @@ Usual usage in an openstack.common module: import copy import gettext -import logging.handlers +import logging import os -import UserString +import re +try: + import UserString as _userString +except ImportError: + import collections as _userString + +from babel import localedata +import six _localedir = os.environ.get('trove'.upper() + '_LOCALEDIR') _t = gettext.translation('trove', localedir=_localedir, fallback=True) +_AVAILABLE_LANGUAGES = {} +USE_LAZY = False + + +def enable_lazy(): + """Convenience function for configuring _() to use lazy gettext + + Call this at the start of execution to enable the gettextutils._ + function to use lazy gettext functionality. This is useful if + your project is importing _ directly instead of using the + gettextutils.install() way of importing the _ function. + """ + global USE_LAZY + USE_LAZY = True + def _(msg): - return _t.ugettext(msg) + if USE_LAZY: + return Message(msg, 'trove') + else: + return _t.ugettext(msg) -def install(domain): +def install(domain, lazy=False): """Install a _() function using the given translation domain. Given a translation domain, install a _() function using gettext's @@ -48,44 +73,48 @@ def install(domain): overriding the default localedir (e.g. /usr/share/locale) using a translation-domain-specific environment variable (e.g. NOVA_LOCALEDIR). + + :param domain: the translation domain + :param lazy: indicates whether or not to install the lazy _() function. + The lazy _() introduces a way to do deferred translation + of messages by installing a _ that builds Message objects, + instead of strings, which can then be lazily translated into + any available locale. """ - gettext.install(domain, - localedir=os.environ.get(domain.upper() + '_LOCALEDIR'), - unicode=True) + if lazy: + # NOTE(mrodden): Lazy gettext functionality. + # + # The following introduces a deferred way to do translations on + # messages in OpenStack. We override the standard _() function + # and % (format string) operation to build Message objects that can + # later be translated when we have more information. + # + # Also included below is an example LocaleHandler that translates + # Messages to an associated locale, effectively allowing many logs, + # each with their own locale. + + def _lazy_gettext(msg): + """Create and return a Message object. + + Lazy gettext function for a given domain, it is a factory method + for a project/module to get a lazy gettext function for its own + translation domain (i.e. nova, glance, cinder, etc.) + + Message encapsulates a string so that we can translate + it later when needed. + """ + return Message(msg, domain) + + import __builtin__ + __builtin__.__dict__['_'] = _lazy_gettext + else: + localedir = '%s_LOCALEDIR' % domain.upper() + gettext.install(domain, + localedir=os.environ.get(localedir), + unicode=True) -""" -Lazy gettext functionality. - -The following is an attempt to introduce a deferred way -to do translations on messages in OpenStack. We attempt to -override the standard _() function and % (format string) operation -to build Message objects that can later be translated when we have -more information. Also included is an example LogHandler that -translates Messages to an associated locale, effectively allowing -many logs, each with their own locale. -""" - - -def get_lazy_gettext(domain): - """Assemble and return a lazy gettext function for a given domain. - - Factory method for a project/module to get a lazy gettext function - for its own translation domain (i.e. nova, glance, cinder, etc.) - """ - - def _lazy_gettext(msg): - """Create and return a Message object. - - Message encapsulates a string so that we can translate it later when - needed. - """ - return Message(msg, domain) - - return _lazy_gettext - - -class Message(UserString.UserString, object): +class Message(_userString.UserString, object): """Class used to encapsulate translatable messages.""" def __init__(self, msg, domain): # _msg is the gettext msgid and should never change @@ -120,7 +149,29 @@ class Message(UserString.UserString, object): if self.params is not None: full_msg = full_msg % self.params - return unicode(full_msg) + return six.text_type(full_msg) + + def _save_dictionary_parameter(self, dict_param): + full_msg = self.data + # look for %(blah) fields in string; + # ignore %% and deal with the + # case where % is first character on the line + keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg) + + # if we don't find any %(blah) blocks but have a %s + if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg): + # apparently the full dictionary is the parameter + params = copy.deepcopy(dict_param) + else: + params = {} + for key in keys: + try: + params[key] = copy.deepcopy(dict_param[key]) + except TypeError: + # cast uncopyable thing to unicode string + params[key] = unicode(dict_param[key]) + + return params def _save_parameters(self, other): # we check for None later to see if @@ -128,8 +179,16 @@ class Message(UserString.UserString, object): # so encapsulate if our parameter is actually None if other is None: self.params = (other, ) + elif isinstance(other, dict): + self.params = self._save_dictionary_parameter(other) else: - self.params = copy.deepcopy(other) + # fallback to casting to unicode, + # this will handle the problematic python code-like + # objects that cannot be deep-copied + try: + self.params = copy.deepcopy(other) + except TypeError: + self.params = unicode(other) return self @@ -196,7 +255,47 @@ class Message(UserString.UserString, object): if name in ops: return getattr(self.data, name) else: - return UserString.UserString.__getattribute__(self, name) + return _userString.UserString.__getattribute__(self, name) + + +def get_available_languages(domain): + """Lists the available languages for the given translation domain. + + :param domain: the domain to get languages for + """ + if domain in _AVAILABLE_LANGUAGES: + return copy.copy(_AVAILABLE_LANGUAGES[domain]) + + localedir = '%s_LOCALEDIR' % domain.upper() + find = lambda x: gettext.find(domain, + localedir=os.environ.get(localedir), + languages=[x]) + + # NOTE(mrodden): en_US should always be available (and first in case + # order matters) since our in-line message strings are en_US + language_list = ['en_US'] + # NOTE(luisg): Babel <1.0 used a function called list(), which was + # renamed to locale_identifiers() in >=1.0, the requirements master list + # requires >=0.9.6, uncapped, so defensively work with both. We can remove + # this check when the master list updates to >=1.0, and all projects udpate + list_identifiers = (getattr(localedata, 'list', None) or + getattr(localedata, 'locale_identifiers')) + locale_identifiers = list_identifiers() + for i in locale_identifiers: + if find(i) is not None: + language_list.append(i) + _AVAILABLE_LANGUAGES[domain] = language_list + return copy.copy(language_list) + + +def get_localized_message(message, user_locale): + """Gets a localized version of the given message in the given locale.""" + if isinstance(message, Message): + if user_locale: + message.locale = user_locale + return unicode(message) + else: + return message class LocaleHandler(logging.Handler): diff --git a/trove/openstack/common/jsonutils.py b/trove/openstack/common/jsonutils.py index ff96a5a47b..9326350245 100644 --- a/trove/openstack/common/jsonutils.py +++ b/trove/openstack/common/jsonutils.py @@ -38,14 +38,18 @@ import functools import inspect import itertools import json -import types -import xmlrpclib +try: + import xmlrpclib +except ImportError: + # NOTE(jd): xmlrpclib is not shipped with Python 3 + xmlrpclib = None -import netaddr import six +from trove.openstack.common import importutils from trove.openstack.common import timeutils +netaddr = importutils.try_import("netaddr") _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.isfunction, inspect.isgeneratorfunction, @@ -53,7 +57,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.iscode, inspect.isbuiltin, inspect.isroutine, inspect.isabstract] -_simple_types = (types.NoneType, int, basestring, bool, float, long) +_simple_types = (six.string_types + six.integer_types + + (type(None), bool, float)) def to_primitive(value, convert_instances=False, convert_datetime=True, @@ -125,7 +130,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled - if isinstance(value, xmlrpclib.DateTime): + if xmlrpclib and isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) if convert_datetime and isinstance(value, datetime.datetime): @@ -138,7 +143,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Likely an instance of something. Watch for cycles. # Ignore class member vars. return recursive(value.__dict__, level=level + 1) - elif isinstance(value, netaddr.IPAddress): + elif netaddr and isinstance(value, netaddr.IPAddress): return six.text_type(value) else: if any(test(value) for test in _nasty_type_tests): diff --git a/trove/openstack/common/local.py b/trove/openstack/common/local.py index f1bfc824bf..e82f17d0f3 100644 --- a/trove/openstack/common/local.py +++ b/trove/openstack/common/local.py @@ -15,16 +15,15 @@ # License for the specific language governing permissions and limitations # under the License. -"""Greenthread local storage of variables using weak references""" +"""Local storage of variables using weak references""" +import threading import weakref -from eventlet import corolocal - -class WeakLocal(corolocal.local): +class WeakLocal(threading.local): def __getattribute__(self, attr): - rval = corolocal.local.__getattribute__(self, attr) + rval = super(WeakLocal, self).__getattribute__(attr) if rval: # NOTE(mikal): this bit is confusing. What is stored is a weak # reference, not the value itself. We therefore need to lookup @@ -34,7 +33,7 @@ class WeakLocal(corolocal.local): def __setattr__(self, attr, value): value = weakref.ref(value) - return corolocal.local.__setattr__(self, attr, value) + return super(WeakLocal, self).__setattr__(attr, value) # NOTE(mikal): the name "store" should be deprecated in the future @@ -45,4 +44,4 @@ store = WeakLocal() # "strong" store will hold a reference to the object so that it never falls out # of scope. weak_store = WeakLocal() -strong_store = corolocal.local +strong_store = threading.local() diff --git a/trove/openstack/common/log.py b/trove/openstack/common/log.py index f0281d70d5..14e5386841 100644 --- a/trove/openstack/common/log.py +++ b/trove/openstack/common/log.py @@ -29,8 +29,6 @@ It also allows setting of formatting information through conf. """ -import ConfigParser -import cStringIO import inspect import itertools import logging @@ -41,8 +39,9 @@ import sys import traceback from oslo.config import cfg +from six import moves -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import jsonutils from trove.openstack.common import local @@ -348,7 +347,7 @@ class LogConfigError(Exception): def _load_log_config(log_config): try: logging.config.fileConfig(log_config) - except ConfigParser.Error as exc: + except moves.configparser.Error as exc: raise LogConfigError(log_config, str(exc)) @@ -521,7 +520,7 @@ class ContextFormatter(logging.Formatter): if not record: return logging.Formatter.formatException(self, exc_info) - stringbuffer = cStringIO.StringIO() + stringbuffer = moves.StringIO() traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], None, stringbuffer) lines = stringbuffer.getvalue().split('\n') diff --git a/trove/openstack/common/loopingcall.py b/trove/openstack/common/loopingcall.py index c18771c243..0e45968614 100644 --- a/trove/openstack/common/loopingcall.py +++ b/trove/openstack/common/loopingcall.py @@ -22,7 +22,7 @@ import sys from eventlet import event from eventlet import greenthread -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import log as logging from trove.openstack.common import timeutils diff --git a/trove/openstack/common/rpc/__init__.py b/trove/openstack/common/rpc/__init__.py index e4807bb190..aad6febafa 100644 --- a/trove/openstack/common/rpc/__init__.py +++ b/trove/openstack/common/rpc/__init__.py @@ -29,7 +29,7 @@ import inspect from oslo.config import cfg -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import local from trove.openstack.common import log as logging @@ -56,8 +56,7 @@ rpc_opts = [ help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['trove.openstack.common.exception', - 'nova.exception', + default=['nova.exception', 'cinder.exception', 'exceptions', ], diff --git a/trove/openstack/common/rpc/amqp.py b/trove/openstack/common/rpc/amqp.py index 862ac8e75c..af2d490384 100644 --- a/trove/openstack/common/rpc/amqp.py +++ b/trove/openstack/common/rpc/amqp.py @@ -34,14 +34,28 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore +from oslo.config import cfg from trove.openstack.common import excutils -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import local from trove.openstack.common import log as logging from trove.openstack.common.rpc import common as rpc_common +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -286,8 +300,13 @@ def pack_context(msg, context): for args at some point. """ - context_d = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) + if isinstance(context, dict): + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.iteritems()]) + else: + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) diff --git a/trove/openstack/common/rpc/common.py b/trove/openstack/common/rpc/common.py index ccd5d00540..bdeb394770 100644 --- a/trove/openstack/common/rpc/common.py +++ b/trove/openstack/common/rpc/common.py @@ -24,7 +24,7 @@ import traceback from oslo.config import cfg import six -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import jsonutils from trove.openstack.common import local @@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote' class RPCException(Exception): - message = _("An unknown RPC related exception occurred.") + msg_fmt = _("An unknown RPC related exception occurred.") def __init__(self, message=None, **kwargs): self.kwargs = kwargs if not message: try: - message = self.message % kwargs + message = self.msg_fmt % kwargs except Exception: # kwargs doesn't match a variable in the message @@ -90,7 +90,7 @@ class RPCException(Exception): for name, value in kwargs.iteritems(): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened - message = self.message + message = self.msg_fmt super(RPCException, self).__init__(message) @@ -104,7 +104,7 @@ class RemoteError(RPCException): contains all of the relevant info. """ - message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") def __init__(self, exc_type=None, value=None, traceback=None): self.exc_type = exc_type @@ -121,7 +121,7 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _('Timeout while waiting on RPC response - ' + msg_fmt = _('Timeout while waiting on RPC response - ' 'topic: "%(topic)s", RPC method: "%(method)s" ' 'info: "%(info)s"') @@ -144,25 +144,25 @@ class Timeout(RPCException): class DuplicateMessageError(RPCException): - message = _("Found duplicate message(%(msg_id)s). Skipping it.") + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") class InvalidRPCConnectionReuse(RPCException): - message = _("Invalid reuse of an RPC connection.") + msg_fmt = _("Invalid reuse of an RPC connection.") class UnsupportedRpcVersion(RPCException): - message = _("Specified RPC version, %(version)s, not supported by " + msg_fmt = _("Specified RPC version, %(version)s, not supported by " "this endpoint.") class UnsupportedRpcEnvelopeVersion(RPCException): - message = _("Specified RPC envelope version, %(version)s, " + msg_fmt = _("Specified RPC envelope version, %(version)s, " "not supported by this endpoint.") class RpcVersionCapError(RPCException): - message = _("Specified RPC version cap, %(version_cap)s, is too low") + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") class Connection(object): diff --git a/trove/openstack/common/rpc/impl_fake.py b/trove/openstack/common/rpc/impl_fake.py index a0ec1e9d0c..486af7be00 100644 --- a/trove/openstack/common/rpc/impl_fake.py +++ b/trove/openstack/common/rpc/impl_fake.py @@ -146,7 +146,7 @@ def multicall(conf, context, topic, msg, timeout=None): try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): - return iter([None]) + raise rpc_common.Timeout("No consumers available") else: return consumer.call(context, version, method, namespace, args, timeout) diff --git a/trove/openstack/common/rpc/impl_kombu.py b/trove/openstack/common/rpc/impl_kombu.py index 478c6a622b..5d395182e2 100644 --- a/trove/openstack/common/rpc/impl_kombu.py +++ b/trove/openstack/common/rpc/impl_kombu.py @@ -30,15 +30,19 @@ import kombu.messaging from oslo.config import cfg from trove.openstack.common import excutils -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import network_utils from trove.openstack.common.rpc import amqp as rpc_amqp from trove.openstack.common.rpc import common as rpc_common +from trove.openstack.common import sslutils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', default='', - help='SSL version to use (valid only if SSL enabled)'), + help='SSL version to use (valid only if SSL enabled). ' + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' + 'be available on some distributions' + ), cfg.StrOpt('kombu_ssl_keyfile', default='', help='SSL key file (valid only if SSL enabled)'), @@ -82,12 +86,6 @@ kombu_opts = [ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.IntOpt('rabbit_heartbeat', - default=60, - help='Seconds between connection keepalive heartbeats'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -148,29 +146,23 @@ class ConsumerBase(object): Messages that are processed without exception are ack'ed. If the message processing generates an exception, it will be - ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. - Rejection is better than waiting for the message to timeout. - Rejected messages are immediately requeued. + ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed. """ - ack_msg = False try: msg = rpc_common.deserialize_msg(message.payload) callback(msg) - ack_msg = True except Exception: if self.ack_on_error: - ack_msg = True LOG.exception(_("Failed to process message" " ... skipping it.")) + message.ack() else: LOG.exception(_("Failed to process message" " ... will requeue.")) - finally: - if ack_msg: - message.ack() - else: - message.reject() + message.requeue() + else: + message.ack() def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will @@ -259,9 +251,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -365,8 +357,8 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -396,7 +388,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'.""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) @@ -452,7 +444,6 @@ class Connection(object): 'userid': self.conf.rabbit_userid, 'password': self.conf.rabbit_password, 'virtual_host': self.conf.rabbit_virtual_host, - 'heartbeat': self.conf.rabbit_heartbeat, } for sp_key, value in server_params.iteritems(): @@ -481,7 +472,8 @@ class Connection(object): # http://docs.python.org/library/ssl.html - ssl.wrap_socket if self.conf.kombu_ssl_version: - ssl_params['ssl_version'] = self.conf.kombu_ssl_version + ssl_params['ssl_version'] = sslutils.validate_ssl_version( + self.conf.kombu_ssl_version) if self.conf.kombu_ssl_keyfile: ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile if self.conf.kombu_ssl_certfile: @@ -492,12 +484,8 @@ class Connection(object): # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED - if not ssl_params: - # Just have the default behavior - return True - else: - # Return the extended behavior - return ssl_params + # Return the extended behavior or just have the default behavior + return ssl_params or True def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have diff --git a/trove/openstack/common/rpc/impl_qpid.py b/trove/openstack/common/rpc/impl_qpid.py index 73c48a984e..df523ac333 100644 --- a/trove/openstack/common/rpc/impl_qpid.py +++ b/trove/openstack/common/rpc/impl_qpid.py @@ -25,7 +25,7 @@ import greenlet from oslo.config import cfg from trove.openstack.common import excutils -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import jsonutils from trove.openstack.common import log as logging @@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + super(DirectConsumer, self).__init__( + session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + }) class TopicConsumer(ConsumerBase): @@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + super(TopicConsumer, self).__init__( + session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, + { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + }) class FanoutConsumer(ConsumerBase): @@ -228,7 +238,7 @@ class FanoutConsumer(ConsumerBase): {"exclusive": True}) def reconnect(self, session): - topic = self.get_node_name() + topic = self.get_node_name().rpartition('_fanout')[0] params = { 'session': session, 'topic': topic, @@ -310,7 +320,7 @@ class DirectPublisher(Publisher): def __init__(self, conf, session, msg_id): """Init a 'direct' publisher.""" super(DirectPublisher, self).__init__(session, msg_id, - {"type": "Direct"}) + {"type": "direct"}) class TopicPublisher(Publisher): diff --git a/trove/openstack/common/rpc/impl_zmq.py b/trove/openstack/common/rpc/impl_zmq.py index 333527be4b..27a66eec8b 100644 --- a/trove/openstack/common/rpc/impl_zmq.py +++ b/trove/openstack/common/rpc/impl_zmq.py @@ -27,7 +27,7 @@ import greenlet from oslo.config import cfg from trove.openstack.common import excutils -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import jsonutils from trove.openstack.common.rpc import common as rpc_common @@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) def consume_in_thread(self): + @excutils.forever_retry_uncaught_exceptions def _consume(sock): LOG.info(_("Consuming socket")) while True: diff --git a/trove/openstack/common/rpc/matchmaker.py b/trove/openstack/common/rpc/matchmaker.py index 01d97e2e59..25df55c447 100644 --- a/trove/openstack/common/rpc/matchmaker.py +++ b/trove/openstack/common/rpc/matchmaker.py @@ -23,7 +23,7 @@ import contextlib import eventlet from oslo.config import cfg -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import log as logging @@ -248,9 +248,7 @@ class DirectBinding(Binding): that it maps directly to a host, thus direct. """ def test(self, key): - if '.' in key: - return True - return False + return '.' in key class TopicBinding(Binding): @@ -262,17 +260,13 @@ class TopicBinding(Binding): matches that of a direct exchange. """ def test(self, key): - if '.' not in key: - return True - return False + return '.' not in key class FanoutBinding(Binding): """Match on fanout keys, where key starts with 'fanout.' string.""" def test(self, key): - if key.startswith('fanout~'): - return True - return False + return key.startswith('fanout~') class StubExchange(Exchange): diff --git a/trove/openstack/common/rpc/matchmaker_ring.py b/trove/openstack/common/rpc/matchmaker_ring.py index 4b2feb7d8c..39b57b866b 100644 --- a/trove/openstack/common/rpc/matchmaker_ring.py +++ b/trove/openstack/common/rpc/matchmaker_ring.py @@ -23,7 +23,7 @@ import json from oslo.config import cfg -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import log as logging from trove.openstack.common.rpc import matchmaker as mm @@ -63,9 +63,7 @@ class RingExchange(mm.Exchange): self.ring0[k] = itertools.cycle(self.ring[k]) def _ring_has(self, key): - if key in self.ring0: - return True - return False + return key in self.ring0 class RoundRobinRingExchange(RingExchange): diff --git a/trove/openstack/common/rpc/proxy.py b/trove/openstack/common/rpc/proxy.py index 94ad682aed..1f66e20b4d 100644 --- a/trove/openstack/common/rpc/proxy.py +++ b/trove/openstack/common/rpc/proxy.py @@ -69,7 +69,7 @@ class RpcProxy(object): v = vers if vers else self.default_version if (self.version_cap and not rpc_common.version_is_compatible(self.version_cap, v)): - raise rpc_common.RpcVersionCapError(version=self.version_cap) + raise rpc_common.RpcVersionCapError(version_cap=self.version_cap) msg['version'] = v def _get_topic(self, topic): diff --git a/trove/openstack/common/rpc/securemessage.py b/trove/openstack/common/rpc/securemessage.py new file mode 100644 index 0000000000..d2f48c0fe3 --- /dev/null +++ b/trove/openstack/common/rpc/securemessage.py @@ -0,0 +1,521 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import collections +import os +import struct +import time + +import requests + +from oslo.config import cfg + +from trove.openstack.common.crypto import utils as cryptoutils +from trove.openstack.common import jsonutils +from trove.openstack.common import log as logging + +secure_message_opts = [ + cfg.BoolOpt('enabled', default=True, + help='Whether Secure Messaging (Signing) is enabled,' + ' defaults to enabled'), + cfg.BoolOpt('enforced', default=False, + help='Whether Secure Messaging (Signing) is enforced,' + ' defaults to not enforced'), + cfg.BoolOpt('encrypt', default=False, + help='Whether Secure Messaging (Encryption) is enabled,' + ' defaults to not enabled'), + cfg.StrOpt('secret_keys_file', + help='Path to the file containing the keys, takes precedence' + ' over secret_key'), + cfg.MultiStrOpt('secret_key', + help='A list of keys: (ex: name:),' + ' ignored if secret_keys_file is set'), + cfg.StrOpt('kds_endpoint', + help='KDS endpoint (ex: http://kds.example.com:35357/v3)'), +] +secure_message_group = cfg.OptGroup('secure_messages', + title='Secure Messaging options') + +LOG = logging.getLogger(__name__) + + +class SecureMessageException(Exception): + """Generic Exception for Secure Messages.""" + + msg = "An unknown Secure Message related exception occurred." + + def __init__(self, msg=None): + if msg is None: + msg = self.msg + super(SecureMessageException, self).__init__(msg) + + +class SharedKeyNotFound(SecureMessageException): + """No shared key was found and no other external authentication mechanism + is available. + """ + + msg = "Shared Key for [%s] Not Found. (%s)" + + def __init__(self, name, errmsg): + super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg)) + + +class InvalidMetadata(SecureMessageException): + """The metadata is invalid.""" + + msg = "Invalid metadata: %s" + + def __init__(self, err): + super(InvalidMetadata, self).__init__(self.msg % err) + + +class InvalidSignature(SecureMessageException): + """Signature validation failed.""" + + msg = "Failed to validate signature (source=%s, destination=%s)" + + def __init__(self, src, dst): + super(InvalidSignature, self).__init__(self.msg % (src, dst)) + + +class UnknownDestinationName(SecureMessageException): + """The Destination name is unknown to us.""" + + msg = "Invalid destination name (%s)" + + def __init__(self, name): + super(UnknownDestinationName, self).__init__(self.msg % name) + + +class InvalidEncryptedTicket(SecureMessageException): + """The Encrypted Ticket could not be successfully handled.""" + + msg = "Invalid Ticket (source=%s, destination=%s)" + + def __init__(self, src, dst): + super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst)) + + +class InvalidExpiredTicket(SecureMessageException): + """The ticket received is already expired.""" + + msg = "Expired ticket (source=%s, destination=%s)" + + def __init__(self, src, dst): + super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst)) + + +class CommunicationError(SecureMessageException): + """The Communication with the KDS failed.""" + + msg = "Communication Error (target=%s): %s" + + def __init__(self, target, errmsg): + super(CommunicationError, self).__init__(self.msg % (target, errmsg)) + + +class InvalidArgument(SecureMessageException): + """Bad initialization argument.""" + + msg = "Invalid argument: %s" + + def __init__(self, errmsg): + super(InvalidArgument, self).__init__(self.msg % errmsg) + + +Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek']) + + +class KeyStore(object): + """A storage class for Signing and Encryption Keys. + + This class creates an object that holds Generic Keys like Signing + Keys, Encryption Keys, Encrypted SEK Tickets ... + """ + + def __init__(self): + self._kvps = dict() + + def _get_key_name(self, source, target, ktype): + return (source, target, ktype) + + def _put(self, src, dst, ktype, expiration, data): + name = self._get_key_name(src, dst, ktype) + self._kvps[name] = (expiration, data) + + def _get(self, src, dst, ktype): + name = self._get_key_name(src, dst, ktype) + if name in self._kvps: + expiration, data = self._kvps[name] + if expiration > time.time(): + return data + else: + del self._kvps[name] + + return None + + def clear(self): + """Wipes the store clear of all data.""" + self._kvps.clear() + + def put_ticket(self, source, target, skey, ekey, esek, expiration): + """Puts a sek pair in the cache. + + :param source: Client name + :param target: Target name + :param skey: The Signing Key + :param ekey: The Encription Key + :param esek: The token encrypted with the target key + :param expiration: Expiration time in seconds since Epoch + """ + keys = Ticket(skey, ekey, esek) + self._put(source, target, 'ticket', expiration, keys) + + def get_ticket(self, source, target): + """Returns a Ticket (skey, ekey, esek) namedtuple for the + source/target pair. + """ + return self._get(source, target, 'ticket') + + +_KEY_STORE = KeyStore() + + +class _KDSClient(object): + + USER_AGENT = 'oslo-incubator/rpc' + + def __init__(self, endpoint=None, timeout=None): + """A KDS Client class.""" + + self._endpoint = endpoint + if timeout is not None: + self.timeout = float(timeout) + else: + self.timeout = None + + def _do_get(self, url, request): + req_kwargs = dict() + req_kwargs['headers'] = dict() + req_kwargs['headers']['User-Agent'] = self.USER_AGENT + req_kwargs['headers']['Content-Type'] = 'application/json' + req_kwargs['data'] = jsonutils.dumps({'request': request}) + if self.timeout is not None: + req_kwargs['timeout'] = self.timeout + + try: + resp = requests.get(url, **req_kwargs) + except requests.ConnectionError as e: + err = "Unable to establish connection. %s" % e + raise CommunicationError(url, err) + + return resp + + def _get_reply(self, url, resp): + if resp.text: + try: + body = jsonutils.loads(resp.text) + reply = body['reply'] + except (KeyError, TypeError, ValueError): + msg = "Failed to decode reply: %s" % resp.text + raise CommunicationError(url, msg) + else: + msg = "No reply data was returned." + raise CommunicationError(url, msg) + + return reply + + def _get_ticket(self, request, url=None, redirects=10): + """Send an HTTP request. + + Wraps around 'requests' to handle redirects and common errors. + """ + if url is None: + if not self._endpoint: + raise CommunicationError(url, 'Endpoint not configured') + url = self._endpoint + '/kds/ticket' + + while redirects: + resp = self._do_get(url, request) + if resp.status_code in (301, 302, 305): + # Redirected. Reissue the request to the new location. + url = resp.headers['location'] + redirects -= 1 + continue + elif resp.status_code != 200: + msg = "Request returned failure status: %s (%s)" + err = msg % (resp.status_code, resp.text) + raise CommunicationError(url, err) + + return self._get_reply(url, resp) + + raise CommunicationError(url, "Too many redirections, giving up!") + + def get_ticket(self, source, target, crypto, key): + + # prepare metadata + md = {'requestor': source, + 'target': target, + 'timestamp': time.time(), + 'nonce': struct.unpack('Q', os.urandom(8))[0]} + metadata = base64.b64encode(jsonutils.dumps(md)) + + # sign metadata + signature = crypto.sign(key, metadata) + + # HTTP request + reply = self._get_ticket({'metadata': metadata, + 'signature': signature}) + + # verify reply + signature = crypto.sign(key, (reply['metadata'] + reply['ticket'])) + if signature != reply['signature']: + raise InvalidEncryptedTicket(md['source'], md['destination']) + md = jsonutils.loads(base64.b64decode(reply['metadata'])) + if ((md['source'] != source or + md['destination'] != target or + md['expiration'] < time.time())): + raise InvalidEncryptedTicket(md['source'], md['destination']) + + # return ticket data + tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket'])) + + return tkt, md['expiration'] + + +# we need to keep a global nonce, as this value should never repeat non +# matter how many SecureMessage objects we create +_NONCE = None + + +def _get_nonce(): + """We keep a single counter per instance, as it is so huge we can't + possibly cycle through within 1/100 of a second anyway. + """ + + global _NONCE + # Lazy initialize, for now get a random value, multiply by 2^32 and + # use it as the nonce base. The counter itself will rotate after + # 2^32 increments. + if _NONCE is None: + _NONCE = [struct.unpack('I', os.urandom(4))[0], 0] + + # Increment counter and wrap at 2^32 + _NONCE[1] += 1 + if _NONCE[1] > 0xffffffff: + _NONCE[1] = 0 + + # Return base + counter + return long((_NONCE[0] * 0xffffffff)) + _NONCE[1] + + +class SecureMessage(object): + """A Secure Message object. + + This class creates a signing/encryption facility for RPC messages. + It encapsulates all the necessary crypto primitives to insulate + regular code from the intricacies of message authentication, validation + and optionally encryption. + + :param topic: The topic name of the queue + :param host: The server name, together with the topic it forms a unique + name that is used to source signing keys, and verify + incoming messages. + :param conf: a ConfigOpts object + :param key: (optional) explicitly pass in endpoint private key. + If not provided it will be sourced from the service config + :param key_store: (optional) Storage class for local caching + :param encrypt: (defaults to False) Whether to encrypt messages + :param enctype: (defaults to AES) Cipher to use + :param hashtype: (defaults to SHA256) Hash function to use for signatures + """ + + def __init__(self, topic, host, conf, key=None, key_store=None, + encrypt=None, enctype='AES', hashtype='SHA256'): + + conf.register_group(secure_message_group) + conf.register_opts(secure_message_opts, group='secure_messages') + + self._name = '%s.%s' % (topic, host) + self._key = key + self._conf = conf.secure_messages + self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt + self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype) + self._hkdf = cryptoutils.HKDF(hashtype) + self._kds = _KDSClient(self._conf.kds_endpoint) + + if self._key is None: + self._key = self._init_key(topic, self._name) + if self._key is None: + err = "Secret Key (or key file) is missing or malformed" + raise SharedKeyNotFound(self._name, err) + + self._key_store = key_store or _KEY_STORE + + def _init_key(self, topic, name): + keys = None + if self._conf.secret_keys_file: + with open(self._conf.secret_keys_file, 'r') as f: + keys = f.readlines() + elif self._conf.secret_key: + keys = self._conf.secret_key + + if keys is None: + return None + + for k in keys: + if k[0] == '#': + continue + if ':' not in k: + break + svc, key = k.split(':', 1) + if svc == topic or svc == name: + return base64.b64decode(key) + + return None + + def _split_key(self, key, size): + sig_key = key[:size] + enc_key = key[size:] + return sig_key, enc_key + + def _decode_esek(self, key, source, target, timestamp, esek): + """This function decrypts the esek buffer passed in and returns a + KeyStore to be used to check and decrypt the received message. + + :param key: The key to use to decrypt the ticket (esek) + :param source: The name of the source service + :param traget: The name of the target service + :param timestamp: The incoming message timestamp + :param esek: a base64 encoded encrypted block containing a JSON string + """ + rkey = None + + try: + s = self._crypto.decrypt(key, esek) + j = jsonutils.loads(s) + + rkey = base64.b64decode(j['key']) + expiration = j['timestamp'] + j['ttl'] + if j['timestamp'] > timestamp or timestamp > expiration: + raise InvalidExpiredTicket(source, target) + + except Exception: + raise InvalidEncryptedTicket(source, target) + + info = '%s,%s,%s' % (source, target, str(j['timestamp'])) + + sek = self._hkdf.expand(rkey, info, len(key) * 2) + + return self._split_key(sek, len(key)) + + def _get_ticket(self, target): + """This function will check if we already have a SEK for the specified + target in the cache, or will go and try to fetch a new SEK from the key + server. + + :param target: The name of the target service + """ + ticket = self._key_store.get_ticket(self._name, target) + + if ticket is not None: + return ticket + + tkt, expiration = self._kds.get_ticket(self._name, target, + self._crypto, self._key) + + self._key_store.put_ticket(self._name, target, + base64.b64decode(tkt['skey']), + base64.b64decode(tkt['ekey']), + tkt['esek'], expiration) + return self._key_store.get_ticket(self._name, target) + + def encode(self, version, target, json_msg): + """This is the main encoding function. + + It takes a target and a message and returns a tuple consisting of a + JSON serialized metadata object, a JSON serialized (and optionally + encrypted) message, and a signature. + + :param version: the current envelope version + :param target: The name of the target service (usually with hostname) + :param json_msg: a serialized json message object + """ + ticket = self._get_ticket(target) + + metadata = jsonutils.dumps({'source': self._name, + 'destination': target, + 'timestamp': time.time(), + 'nonce': _get_nonce(), + 'esek': ticket.esek, + 'encryption': self._encrypt}) + + message = json_msg + if self._encrypt: + message = self._crypto.encrypt(ticket.ekey, message) + + signature = self._crypto.sign(ticket.skey, + version + metadata + message) + + return (metadata, message, signature) + + def decode(self, version, metadata, message, signature): + """This is the main decoding function. + + It takes a version, metadata, message and signature strings and + returns a tuple with a (decrypted) message and metadata or raises + an exception in case of error. + + :param version: the current envelope version + :param metadata: a JSON serialized object with metadata for validation + :param message: a JSON serialized (base64 encoded encrypted) message + :param signature: a base64 encoded signature + """ + md = jsonutils.loads(metadata) + + check_args = ('source', 'destination', 'timestamp', + 'nonce', 'esek', 'encryption') + for arg in check_args: + if arg not in md: + raise InvalidMetadata('Missing metadata "%s"' % arg) + + if md['destination'] != self._name: + # TODO(simo) handle group keys by checking target + raise UnknownDestinationName(md['destination']) + + try: + skey, ekey = self._decode_esek(self._key, + md['source'], md['destination'], + md['timestamp'], md['esek']) + except InvalidExpiredTicket: + raise + except Exception: + raise InvalidMetadata('Failed to decode ESEK for %s/%s' % ( + md['source'], md['destination'])) + + sig = self._crypto.sign(skey, version + metadata + message) + + if sig != signature: + raise InvalidSignature(md['source'], md['destination']) + + if md['encryption'] is True: + msg = self._crypto.decrypt(ekey, message) + else: + msg = message + + return (md, msg) diff --git a/trove/openstack/common/rpc/service.py b/trove/openstack/common/rpc/service.py index 233d05b9bd..e03173e69f 100644 --- a/trove/openstack/common/rpc/service.py +++ b/trove/openstack/common/rpc/service.py @@ -17,7 +17,7 @@ # License for the specific language governing permissions and limitations # under the License. -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import log as logging from trove.openstack.common import rpc from trove.openstack.common.rpc import dispatcher as rpc_dispatcher @@ -32,10 +32,11 @@ class Service(service.Service): A service enables rpc by listening to queues based on topic and host. """ - def __init__(self, host, topic, manager=None): + def __init__(self, host, topic, manager=None, serializer=None): super(Service, self).__init__() self.host = host self.topic = topic + self.serializer = serializer if manager is None: self.manager = self else: @@ -48,7 +49,8 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], + self.serializer) # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) diff --git a/trove/openstack/common/rpc/zmq_receiver.py b/trove/openstack/common/rpc/zmq_receiver.py old mode 100755 new mode 100644 index 3037fed4c6..7866f349f9 --- a/trove/openstack/common/rpc/zmq_receiver.py +++ b/trove/openstack/common/rpc/zmq_receiver.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 OpenStack Foundation diff --git a/trove/openstack/common/service.py b/trove/openstack/common/service.py index 609c348e13..905931cabd 100644 --- a/trove/openstack/common/service.py +++ b/trove/openstack/common/service.py @@ -32,7 +32,7 @@ import logging as std_logging from oslo.config import cfg from trove.openstack.common import eventlet_backdoor -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa from trove.openstack.common import importutils from trove.openstack.common import log as logging from trove.openstack.common import threadgroup @@ -81,6 +81,15 @@ class Launcher(object): """ self.services.wait() + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() + class SignalExit(SystemExit): def __init__(self, signo, exccode=1): @@ -93,31 +102,51 @@ class ServiceLauncher(Launcher): # Allow the process to be killed again and die from natural causes signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) raise SignalExit(signo) - def wait(self): + def handle_signal(self): signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGHUP, self._handle_signal) + + def _wait_for_exit_or_signal(self): + status = None + signo = 0 LOG.debug(_('Full set of CONF:')) CONF.log_opt_values(LOG, std_logging.DEBUG) - status = None try: super(ServiceLauncher, self).wait() except SignalExit as exc: signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[exc.signo] LOG.info(_('Caught %s, exiting'), signame) status = exc.code + signo = exc.signo except SystemExit as exc: status = exc.code finally: self.stop() if rpc: - rpc.cleanup() - return status + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_('Exception during rpc cleanup.')) + + return status, signo + + def wait(self): + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal() + if signo != signal.SIGHUP: + return status + self.restart() class ServiceWrapper(object): @@ -135,9 +164,12 @@ class ProcessLauncher(object): self.running = True rfd, self.writepipe = os.pipe() self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() + def handle_signal(self): signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGHUP, self._handle_signal) def _handle_signal(self, signo, frame): self.sigcaught = signo @@ -146,6 +178,7 @@ class ProcessLauncher(object): # Allow the process to be killed again and die from natural causes signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) def _pipe_watcher(self): # This will block until the write end is closed when the parent @@ -156,16 +189,47 @@ class ProcessLauncher(object): sys.exit(1) - def _child_process(self, service): + def _child_process_handle_signal(self): # Setup child signal handlers differently def _sigterm(*args): signal.signal(signal.SIGTERM, signal.SIG_DFL) raise SignalExit(signal.SIGTERM) + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + signal.signal(signal.SIGTERM, _sigterm) + signal.signal(signal.SIGHUP, _sighup) # Block SIGINT and let the parent send us a SIGTERM signal.signal(signal.SIGINT, signal.SIG_IGN) + def _child_wait_for_exit_or_signal(self, launcher): + status = None + signo = 0 + + try: + launcher.wait() + except SignalExit as exc: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[exc.signo] + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + # Reopen the eventlet hub to make sure we don't share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() @@ -180,7 +244,7 @@ class ProcessLauncher(object): launcher = Launcher() launcher.launch_service(service) - launcher.wait() + return launcher def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -201,21 +265,13 @@ class ProcessLauncher(object): # NOTE(johannes): All exceptions are caught to ensure this # doesn't fallback into the loop spawning children. It would # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.service) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.service.stop() + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if signo != signal.SIGHUP: + break + launcher.restart() os._exit(status) @@ -261,12 +317,7 @@ class ProcessLauncher(object): wrap.children.remove(pid) return wrap - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - LOG.debug(_('Full set of CONF:')) - CONF.log_opt_values(LOG, std_logging.DEBUG) - + def _respawn_children(self): while self.running: wrap = self._wait_child() if not wrap: @@ -275,14 +326,30 @@ class ProcessLauncher(object): # (see bug #1095346) eventlet.greenthread.sleep(.01) continue - while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap) - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + while True: + self.handle_signal() + self._respawn_children() + if self.sigcaught: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[self.sigcaught] + LOG.info(_('Caught %s, stopping children'), signame) + if self.sigcaught != signal.SIGHUP: + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None for pid in self.children: try: @@ -307,13 +374,19 @@ class Service(object): # signal that the service is done shutting itself down: self._done = event.Event() + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() self.tg.wait() - self._done.send() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() def wait(self): self._done.wait() @@ -336,9 +409,10 @@ class Services(object): service.stop() service.wait() - # each service has performed cleanup, now signal that the run_service + # Each service has performed cleanup, now signal that the run_service # wrapper threads can now die: - self.done.send() + if not self.done.ready(): + self.done.send() # reap threads: self.tg.stop() @@ -346,6 +420,13 @@ class Services(object): def wait(self): self.tg.wait() + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + @staticmethod def run_service(service, done): """Service start wrapper. diff --git a/trove/openstack/common/sslutils.py b/trove/openstack/common/sslutils.py index a07f6f94bd..dac397f72a 100644 --- a/trove/openstack/common/sslutils.py +++ b/trove/openstack/common/sslutils.py @@ -19,7 +19,7 @@ import ssl from oslo.config import cfg -from trove.openstack.common.gettextutils import _ +from trove.openstack.common.gettextutils import _ # noqa ssl_opts = [ @@ -78,3 +78,23 @@ def wrap(sock): ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED return ssl.wrap_socket(sock, **ssl_kwargs) + + +_SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23, + "sslv3": ssl.PROTOCOL_SSLv3 +} + +try: + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 +except AttributeError: + pass + + +def validate_ssl_version(version): + key = version.lower() + try: + return _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/trove/openstack/common/threadgroup.py b/trove/openstack/common/threadgroup.py index 7eadac7847..c5b6aa04f6 100644 --- a/trove/openstack/common/threadgroup.py +++ b/trove/openstack/common/threadgroup.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenlet +import eventlet from eventlet import greenpool from eventlet import greenthread @@ -105,7 +105,7 @@ class ThreadGroup(object): for x in self.timers: try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) @@ -115,7 +115,7 @@ class ThreadGroup(object): continue try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) diff --git a/trove/openstack/common/timeutils.py b/trove/openstack/common/timeutils.py index bd60489e56..aa9f708074 100644 --- a/trove/openstack/common/timeutils.py +++ b/trove/openstack/common/timeutils.py @@ -49,9 +49,9 @@ def parse_isotime(timestr): try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) except TypeError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) def strtime(at=None, fmt=PERFECT_TIME_FORMAT):