oslo-incubator rpc update

* updated rpc oslo code
Addresses blueprint trove/update-oslo

Change-Id: I65cfbc860b139638d460dca47ce114985ddd658c
This commit is contained in:
Craig Vyvial 2013-08-27 13:33:00 -05:00 committed by Nikhil Manchanda
parent c05fc3fbe0
commit fbf4a56efd
28 changed files with 1118 additions and 201 deletions

View File

@ -24,3 +24,4 @@ pexpect
-f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3
oslo.config>=1.2.0a3
mysql-python
Babel>=0.9.6

View File

@ -34,7 +34,7 @@ def delete_queue(context, topic):
if CONF.rpc_backend == "trove.openstack.common.rpc.impl_kombu":
connection = openstack_rpc.create_connection()
channel = connection.channel
durable = connection.conf.rabbit_durable_queues
durable = connection.conf.amqp_durable_queues
queue = kombu.entity.Queue(name=topic, channel=channel,
auto_delete=False, exclusive=False,
durable=durable)

View File

@ -0,0 +1,179 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from Crypto.Hash import HMAC
from Crypto import Random
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
class CryptoutilsException(Exception):
"""Generic Exception for Crypto utilities."""
message = _("An unknown error occurred in crypto utils.")
class CipherBlockLengthTooBig(CryptoutilsException):
"""The block size is too big."""
def __init__(self, requested, permitted):
msg = _("Block size of %(given)d is too big, max = %(maximum)d")
message = msg % {'given': requested, 'maximum': permitted}
super(CryptoutilsException, self).__init__(message)
class HKDFOutputLengthTooLong(CryptoutilsException):
"""The amount of Key Material asked is too much."""
def __init__(self, requested, permitted):
msg = _("Length of %(given)d is too long, max = %(maximum)d")
message = msg % {'given': requested, 'maximum': permitted}
super(CryptoutilsException, self).__init__(message)
class HKDF(object):
"""An HMAC-based Key Derivation Function implementation (RFC5869)
This class creates an object that allows to use HKDF to derive keys.
"""
def __init__(self, hashtype='SHA256'):
self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype)
self.max_okm_length = 255 * self.hashfn.digest_size
def extract(self, ikm, salt=None):
"""An extract function that can be used to derive a robust key given
weak Input Key Material (IKM) which could be a password.
Returns a pseudorandom key (of HashLen octets)
:param ikm: input keying material (ex a password)
:param salt: optional salt value (a non-secret random value)
"""
if salt is None:
salt = '\x00' * self.hashfn.digest_size
return HMAC.new(salt, ikm, self.hashfn).digest()
def expand(self, prk, info, length):
"""An expand function that will return arbitrary length output that can
be used as keys.
Returns a buffer usable as key material.
:param prk: a pseudorandom key of at least HashLen octets
:param info: optional string (can be a zero-length string)
:param length: length of output keying material (<= 255 * HashLen)
"""
if length > self.max_okm_length:
raise HKDFOutputLengthTooLong(length, self.max_okm_length)
N = (length + self.hashfn.digest_size - 1) / self.hashfn.digest_size
okm = ""
tmp = ""
for block in range(1, N + 1):
tmp = HMAC.new(prk, tmp + info + chr(block), self.hashfn).digest()
okm += tmp
return okm[:length]
MAX_CB_SIZE = 256
class SymmetricCrypto(object):
"""Symmetric Key Crypto object.
This class creates a Symmetric Key Crypto object that can be used
to encrypt, decrypt, or sign arbitrary data.
:param enctype: Encryption Cipher name (default: AES)
:param hashtype: Hash/HMAC type name (default: SHA256)
"""
def __init__(self, enctype='AES', hashtype='SHA256'):
self.cipher = importutils.import_module('Crypto.Cipher.' + enctype)
self.hashfn = importutils.import_module('Crypto.Hash.' + hashtype)
def new_key(self, size):
return Random.new().read(size)
def encrypt(self, key, msg, b64encode=True):
"""Encrypt the provided msg and returns the cyphertext optionally
base64 encoded.
Uses AES-128-CBC with a Random IV by default.
The plaintext is padded to reach blocksize length.
The last byte of the block is the length of the padding.
The length of the padding does not include the length byte itself.
:param key: The Encryption key.
:param msg: the plain text.
:returns encblock: a block of encrypted data.
"""
iv = Random.new().read(self.cipher.block_size)
cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv)
# CBC mode requires a fixed block size. Append padding and length of
# padding.
if self.cipher.block_size > MAX_CB_SIZE:
raise CipherBlockLengthTooBig(self.cipher.block_size, MAX_CB_SIZE)
r = len(msg) % self.cipher.block_size
padlen = self.cipher.block_size - r - 1
msg += '\x00' * padlen
msg += chr(padlen)
enc = iv + cipher.encrypt(msg)
if b64encode:
enc = base64.b64encode(enc)
return enc
def decrypt(self, key, msg, b64decode=True):
"""Decrypts the provided ciphertext, optionally base 64 encoded, and
returns the plaintext message, after padding is removed.
Uses AES-128-CBC with an IV by default.
:param key: The Encryption key.
:param msg: the ciphetext, the first block is the IV
"""
if b64decode:
msg = base64.b64decode(msg)
iv = msg[:self.cipher.block_size]
cipher = self.cipher.new(key, self.cipher.MODE_CBC, iv)
padded = cipher.decrypt(msg[self.cipher.block_size:])
l = ord(padded[-1]) + 1
plain = padded[:-l]
return plain
def sign(self, key, msg, b64encode=True):
"""Signs a message string and returns a base64 encoded signature.
Uses HMAC-SHA-256 by default.
:param key: The Signing key.
:param msg: the message to sign.
"""
h = HMAC.new(key, msg, self.hashfn)
out = h.digest()
if b64encode:
out = base64.b64encode(out)
return out

View File

@ -31,20 +31,20 @@ import eventlet.backdoor
import greenlet
from oslo.config import cfg
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import log as logging
help_for_backdoor_port = 'Acceptable ' + \
'values are 0, <port> and <start>:<end>, where 0 results in ' + \
'listening on a random tcp port number, <port> results in ' + \
'listening on the specified port number and not enabling backdoor' + \
'if it is in use and <start>:<end> results in listening on the ' + \
'smallest unused port number within the specified range of port ' + \
'numbers. The chosen port is displayed in the service\'s log file.'
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
cfg.StrOpt('backdoor_port',
default=None,
help='Enable eventlet backdoor. %s' % help_for_backdoor_port)
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF

View File

@ -24,7 +24,9 @@ import sys
import time
import traceback
from trove.openstack.common.gettextutils import _
import six
from trove.openstack.common.gettextutils import _ # noqa
class save_and_reraise_exception(object):
@ -65,7 +67,7 @@ class save_and_reraise_exception(object):
self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
six.reraise(self.type_, self.value, self.tb)
def forever_retry_uncaught_exceptions(infunc):
@ -77,7 +79,8 @@ def forever_retry_uncaught_exceptions(infunc):
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
this_exc_message = unicode(exc)
if this_exc_message == last_exc_message:
exc_count += 1
else:
exc_count = 1
@ -85,12 +88,12 @@ def forever_retry_uncaught_exceptions(infunc):
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
this_exc_message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
last_exc_message = this_exc_message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.

View File

@ -1,8 +1,8 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -26,19 +26,44 @@ Usual usage in an openstack.common module:
import copy
import gettext
import logging.handlers
import logging
import os
import UserString
import re
try:
import UserString as _userString
except ImportError:
import collections as _userString
from babel import localedata
import six
_localedir = os.environ.get('trove'.upper() + '_LOCALEDIR')
_t = gettext.translation('trove', localedir=_localedir, fallback=True)
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
def enable_lazy():
"""Convenience function for configuring _() to use lazy gettext
Call this at the start of execution to enable the gettextutils._
function to use lazy gettext functionality. This is useful if
your project is importing _ directly instead of using the
gettextutils.install() way of importing the _ function.
"""
global USE_LAZY
USE_LAZY = True
def _(msg):
return _t.ugettext(msg)
if USE_LAZY:
return Message(msg, 'trove')
else:
return _t.ugettext(msg)
def install(domain):
def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
Given a translation domain, install a _() function using gettext's
@ -48,44 +73,48 @@ def install(domain):
overriding the default localedir (e.g. /usr/share/locale) using
a translation-domain-specific environment variable (e.g.
NOVA_LOCALEDIR).
:param domain: the translation domain
:param lazy: indicates whether or not to install the lazy _() function.
The lazy _() introduces a way to do deferred translation
of messages by installing a _ that builds Message objects,
instead of strings, which can then be lazily translated into
any available locale.
"""
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)
if lazy:
# NOTE(mrodden): Lazy gettext functionality.
#
# The following introduces a deferred way to do translations on
# messages in OpenStack. We override the standard _() function
# and % (format string) operation to build Message objects that can
# later be translated when we have more information.
#
# Also included below is an example LocaleHandler that translates
# Messages to an associated locale, effectively allowing many logs,
# each with their own locale.
def _lazy_gettext(msg):
"""Create and return a Message object.
Lazy gettext function for a given domain, it is a factory method
for a project/module to get a lazy gettext function for its own
translation domain (i.e. nova, glance, cinder, etc.)
Message encapsulates a string so that we can translate
it later when needed.
"""
return Message(msg, domain)
import __builtin__
__builtin__.__dict__['_'] = _lazy_gettext
else:
localedir = '%s_LOCALEDIR' % domain.upper()
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
"""
Lazy gettext functionality.
The following is an attempt to introduce a deferred way
to do translations on messages in OpenStack. We attempt to
override the standard _() function and % (format string) operation
to build Message objects that can later be translated when we have
more information. Also included is an example LogHandler that
translates Messages to an associated locale, effectively allowing
many logs, each with their own locale.
"""
def get_lazy_gettext(domain):
"""Assemble and return a lazy gettext function for a given domain.
Factory method for a project/module to get a lazy gettext function
for its own translation domain (i.e. nova, glance, cinder, etc.)
"""
def _lazy_gettext(msg):
"""Create and return a Message object.
Message encapsulates a string so that we can translate it later when
needed.
"""
return Message(msg, domain)
return _lazy_gettext
class Message(UserString.UserString, object):
class Message(_userString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
@ -120,7 +149,29 @@ class Message(UserString.UserString, object):
if self.params is not None:
full_msg = full_msg % self.params
return unicode(full_msg)
return six.text_type(full_msg)
def _save_dictionary_parameter(self, dict_param):
full_msg = self.data
# look for %(blah) fields in string;
# ignore %% and deal with the
# case where % is first character on the line
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg)
# if we don't find any %(blah) blocks but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
# apparently the full dictionary is the parameter
params = copy.deepcopy(dict_param)
else:
params = {}
for key in keys:
try:
params[key] = copy.deepcopy(dict_param[key])
except TypeError:
# cast uncopyable thing to unicode string
params[key] = unicode(dict_param[key])
return params
def _save_parameters(self, other):
# we check for None later to see if
@ -128,8 +179,16 @@ class Message(UserString.UserString, object):
# so encapsulate if our parameter is actually None
if other is None:
self.params = (other, )
elif isinstance(other, dict):
self.params = self._save_dictionary_parameter(other)
else:
self.params = copy.deepcopy(other)
# fallback to casting to unicode,
# this will handle the problematic python code-like
# objects that cannot be deep-copied
try:
self.params = copy.deepcopy(other)
except TypeError:
self.params = unicode(other)
return self
@ -196,7 +255,47 @@ class Message(UserString.UserString, object):
if name in ops:
return getattr(self.data, name)
else:
return UserString.UserString.__getattribute__(self, name)
return _userString.UserString.__getattribute__(self, name)
def get_available_languages(domain):
"""Lists the available languages for the given translation domain.
:param domain: the domain to get languages for
"""
if domain in _AVAILABLE_LANGUAGES:
return copy.copy(_AVAILABLE_LANGUAGES[domain])
localedir = '%s_LOCALEDIR' % domain.upper()
find = lambda x: gettext.find(domain,
localedir=os.environ.get(localedir),
languages=[x])
# NOTE(mrodden): en_US should always be available (and first in case
# order matters) since our in-line message strings are en_US
language_list = ['en_US']
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
# this check when the master list updates to >=1.0, and all projects udpate
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
language_list.append(i)
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
if isinstance(message, Message):
if user_locale:
message.locale = user_locale
return unicode(message)
else:
return message
class LocaleHandler(logging.Handler):

View File

@ -38,14 +38,18 @@ import functools
import inspect
import itertools
import json
import types
import xmlrpclib
try:
import xmlrpclib
except ImportError:
# NOTE(jd): xmlrpclib is not shipped with Python 3
xmlrpclib = None
import netaddr
import six
from trove.openstack.common import importutils
from trove.openstack.common import timeutils
netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
@ -53,7 +57,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
_simple_types = (six.string_types + six.integer_types
+ (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
@ -125,7 +130,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):
@ -138,7 +143,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
elif isinstance(value, netaddr.IPAddress):
elif netaddr and isinstance(value, netaddr.IPAddress):
return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Greenthread local storage of variables using weak references"""
"""Local storage of variables using weak references"""
import threading
import weakref
from eventlet import corolocal
class WeakLocal(corolocal.local):
class WeakLocal(threading.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
rval = super(WeakLocal, self).__getattribute__(attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
@ -34,7 +33,7 @@ class WeakLocal(corolocal.local):
def __setattr__(self, attr, value):
value = weakref.ref(value)
return corolocal.local.__setattr__(self, attr, value)
return super(WeakLocal, self).__setattr__(attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
@ -45,4 +44,4 @@ store = WeakLocal()
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local
strong_store = threading.local()

View File

@ -29,8 +29,6 @@ It also allows setting of formatting information through conf.
"""
import ConfigParser
import cStringIO
import inspect
import itertools
import logging
@ -41,8 +39,9 @@ import sys
import traceback
from oslo.config import cfg
from six import moves
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import jsonutils
from trove.openstack.common import local
@ -348,7 +347,7 @@ class LogConfigError(Exception):
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
except ConfigParser.Error as exc:
except moves.configparser.Error as exc:
raise LogConfigError(log_config, str(exc))
@ -521,7 +520,7 @@ class ContextFormatter(logging.Formatter):
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = cStringIO.StringIO()
stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')

View File

@ -22,7 +22,7 @@ import sys
from eventlet import event
from eventlet import greenthread
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import log as logging
from trove.openstack.common import timeutils

View File

@ -29,7 +29,7 @@ import inspect
from oslo.config import cfg
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import local
from trove.openstack.common import log as logging
@ -56,8 +56,7 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['trove.openstack.common.exception',
'nova.exception',
default=['nova.exception',
'cinder.exception',
'exceptions',
],

View File

@ -34,14 +34,28 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
from oslo.config import cfg
from trove.openstack.common import excutils
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import local
from trove.openstack.common import log as logging
from trove.openstack.common.rpc import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
deprecated_group='DEFAULT',
help='Use durable queues in amqp.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
help='Auto-delete queues in amqp.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@ -286,8 +300,13 @@ def pack_context(msg, context):
for args at some point.
"""
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
if isinstance(context, dict):
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.iteritems()])
else:
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)

View File

@ -24,7 +24,7 @@ import traceback
from oslo.config import cfg
import six
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import jsonutils
from trove.openstack.common import local
@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
message = self.message % kwargs
message = self.msg_fmt % kwargs
except Exception:
# kwargs doesn't match a variable in the message
@ -90,7 +90,7 @@ class RPCException(Exception):
for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
message = self.message
message = self.msg_fmt
super(RPCException, self).__init__(message)
@ -104,7 +104,7 @@ class RemoteError(RPCException):
contains all of the relevant info.
"""
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
@ -121,7 +121,7 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _('Timeout while waiting on RPC response - '
msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
@ -144,25 +144,25 @@ class Timeout(RPCException):
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException):
message = _("Specified RPC version, %(version)s, not supported by "
msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):

View File

@ -146,7 +146,7 @@ def multicall(conf, context, topic, msg, timeout=None):
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
raise rpc_common.Timeout("No consumers available")
else:
return consumer.call(context, version, method, namespace, args,
timeout)

View File

@ -30,15 +30,19 @@ import kombu.messaging
from oslo.config import cfg
from trove.openstack.common import excutils
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import network_utils
from trove.openstack.common.rpc import amqp as rpc_amqp
from trove.openstack.common.rpc import common as rpc_common
from trove.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
help='SSL version to use (valid only if SSL enabled)'),
help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
@ -82,12 +86,6 @@ kombu_opts = [
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
cfg.IntOpt('rabbit_heartbeat',
default=60,
help='Seconds between connection keepalive heartbeats'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@ -148,29 +146,23 @@ class ConsumerBase(object):
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
Rejection is better than waiting for the message to timeout.
Rejected messages are immediately requeued.
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
"""
ack_msg = False
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
ack_msg = True
except Exception:
if self.ack_on_error:
ack_msg = True
LOG.exception(_("Failed to process message"
" ... skipping it."))
message.ack()
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
finally:
if ack_msg:
message.ack()
else:
message.reject()
message.requeue()
else:
message.ack()
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
@ -259,9 +251,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@ -365,8 +357,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.rabbit_durable_queues,
'auto_delete': False,
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
@ -396,7 +388,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
@ -452,7 +444,6 @@ class Connection(object):
'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password,
'virtual_host': self.conf.rabbit_virtual_host,
'heartbeat': self.conf.rabbit_heartbeat,
}
for sp_key, value in server_params.iteritems():
@ -481,7 +472,8 @@ class Connection(object):
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.conf.kombu_ssl_version
ssl_params['ssl_version'] = sslutils.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
@ -492,12 +484,8 @@ class Connection(object):
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
if not ssl_params:
# Just have the default behavior
return True
else:
# Return the extended behavior
return ssl_params
# Return the extended behavior or just have the default behavior
return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have

View File

@ -25,7 +25,7 @@ import greenlet
from oslo.config import cfg
from trove.openstack.common import excutils
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import jsonutils
from trove.openstack.common import log as logging
@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase):
@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase):
@ -228,7 +238,7 @@ class FanoutConsumer(ConsumerBase):
{"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
@ -310,7 +320,7 @@ class DirectPublisher(Publisher):
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})
{"type": "direct"})
class TopicPublisher(Publisher):

View File

@ -27,7 +27,7 @@ import greenlet
from oslo.config import cfg
from trove.openstack.common import excutils
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import jsonutils
from trove.openstack.common.rpc import common as rpc_common
@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered"))
def consume_in_thread(self):
@excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:

View File

@ -23,7 +23,7 @@ import contextlib
import eventlet
from oslo.config import cfg
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import log as logging
@ -248,9 +248,7 @@ class DirectBinding(Binding):
that it maps directly to a host, thus direct.
"""
def test(self, key):
if '.' in key:
return True
return False
return '.' in key
class TopicBinding(Binding):
@ -262,17 +260,13 @@ class TopicBinding(Binding):
matches that of a direct exchange.
"""
def test(self, key):
if '.' not in key:
return True
return False
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
if key.startswith('fanout~'):
return True
return False
return key.startswith('fanout~')
class StubExchange(Exchange):

View File

@ -23,7 +23,7 @@ import json
from oslo.config import cfg
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import log as logging
from trove.openstack.common.rpc import matchmaker as mm
@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
return key in self.ring0
class RoundRobinRingExchange(RingExchange):

View File

@ -69,7 +69,7 @@ class RpcProxy(object):
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):

View File

@ -0,0 +1,521 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import collections
import os
import struct
import time
import requests
from oslo.config import cfg
from trove.openstack.common.crypto import utils as cryptoutils
from trove.openstack.common import jsonutils
from trove.openstack.common import log as logging
secure_message_opts = [
cfg.BoolOpt('enabled', default=True,
help='Whether Secure Messaging (Signing) is enabled,'
' defaults to enabled'),
cfg.BoolOpt('enforced', default=False,
help='Whether Secure Messaging (Signing) is enforced,'
' defaults to not enforced'),
cfg.BoolOpt('encrypt', default=False,
help='Whether Secure Messaging (Encryption) is enabled,'
' defaults to not enabled'),
cfg.StrOpt('secret_keys_file',
help='Path to the file containing the keys, takes precedence'
' over secret_key'),
cfg.MultiStrOpt('secret_key',
help='A list of keys: (ex: name:<base64 encoded key>),'
' ignored if secret_keys_file is set'),
cfg.StrOpt('kds_endpoint',
help='KDS endpoint (ex: http://kds.example.com:35357/v3)'),
]
secure_message_group = cfg.OptGroup('secure_messages',
title='Secure Messaging options')
LOG = logging.getLogger(__name__)
class SecureMessageException(Exception):
"""Generic Exception for Secure Messages."""
msg = "An unknown Secure Message related exception occurred."
def __init__(self, msg=None):
if msg is None:
msg = self.msg
super(SecureMessageException, self).__init__(msg)
class SharedKeyNotFound(SecureMessageException):
"""No shared key was found and no other external authentication mechanism
is available.
"""
msg = "Shared Key for [%s] Not Found. (%s)"
def __init__(self, name, errmsg):
super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg))
class InvalidMetadata(SecureMessageException):
"""The metadata is invalid."""
msg = "Invalid metadata: %s"
def __init__(self, err):
super(InvalidMetadata, self).__init__(self.msg % err)
class InvalidSignature(SecureMessageException):
"""Signature validation failed."""
msg = "Failed to validate signature (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidSignature, self).__init__(self.msg % (src, dst))
class UnknownDestinationName(SecureMessageException):
"""The Destination name is unknown to us."""
msg = "Invalid destination name (%s)"
def __init__(self, name):
super(UnknownDestinationName, self).__init__(self.msg % name)
class InvalidEncryptedTicket(SecureMessageException):
"""The Encrypted Ticket could not be successfully handled."""
msg = "Invalid Ticket (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst))
class InvalidExpiredTicket(SecureMessageException):
"""The ticket received is already expired."""
msg = "Expired ticket (source=%s, destination=%s)"
def __init__(self, src, dst):
super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst))
class CommunicationError(SecureMessageException):
"""The Communication with the KDS failed."""
msg = "Communication Error (target=%s): %s"
def __init__(self, target, errmsg):
super(CommunicationError, self).__init__(self.msg % (target, errmsg))
class InvalidArgument(SecureMessageException):
"""Bad initialization argument."""
msg = "Invalid argument: %s"
def __init__(self, errmsg):
super(InvalidArgument, self).__init__(self.msg % errmsg)
Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek'])
class KeyStore(object):
"""A storage class for Signing and Encryption Keys.
This class creates an object that holds Generic Keys like Signing
Keys, Encryption Keys, Encrypted SEK Tickets ...
"""
def __init__(self):
self._kvps = dict()
def _get_key_name(self, source, target, ktype):
return (source, target, ktype)
def _put(self, src, dst, ktype, expiration, data):
name = self._get_key_name(src, dst, ktype)
self._kvps[name] = (expiration, data)
def _get(self, src, dst, ktype):
name = self._get_key_name(src, dst, ktype)
if name in self._kvps:
expiration, data = self._kvps[name]
if expiration > time.time():
return data
else:
del self._kvps[name]
return None
def clear(self):
"""Wipes the store clear of all data."""
self._kvps.clear()
def put_ticket(self, source, target, skey, ekey, esek, expiration):
"""Puts a sek pair in the cache.
:param source: Client name
:param target: Target name
:param skey: The Signing Key
:param ekey: The Encription Key
:param esek: The token encrypted with the target key
:param expiration: Expiration time in seconds since Epoch
"""
keys = Ticket(skey, ekey, esek)
self._put(source, target, 'ticket', expiration, keys)
def get_ticket(self, source, target):
"""Returns a Ticket (skey, ekey, esek) namedtuple for the
source/target pair.
"""
return self._get(source, target, 'ticket')
_KEY_STORE = KeyStore()
class _KDSClient(object):
USER_AGENT = 'oslo-incubator/rpc'
def __init__(self, endpoint=None, timeout=None):
"""A KDS Client class."""
self._endpoint = endpoint
if timeout is not None:
self.timeout = float(timeout)
else:
self.timeout = None
def _do_get(self, url, request):
req_kwargs = dict()
req_kwargs['headers'] = dict()
req_kwargs['headers']['User-Agent'] = self.USER_AGENT
req_kwargs['headers']['Content-Type'] = 'application/json'
req_kwargs['data'] = jsonutils.dumps({'request': request})
if self.timeout is not None:
req_kwargs['timeout'] = self.timeout
try:
resp = requests.get(url, **req_kwargs)
except requests.ConnectionError as e:
err = "Unable to establish connection. %s" % e
raise CommunicationError(url, err)
return resp
def _get_reply(self, url, resp):
if resp.text:
try:
body = jsonutils.loads(resp.text)
reply = body['reply']
except (KeyError, TypeError, ValueError):
msg = "Failed to decode reply: %s" % resp.text
raise CommunicationError(url, msg)
else:
msg = "No reply data was returned."
raise CommunicationError(url, msg)
return reply
def _get_ticket(self, request, url=None, redirects=10):
"""Send an HTTP request.
Wraps around 'requests' to handle redirects and common errors.
"""
if url is None:
if not self._endpoint:
raise CommunicationError(url, 'Endpoint not configured')
url = self._endpoint + '/kds/ticket'
while redirects:
resp = self._do_get(url, request)
if resp.status_code in (301, 302, 305):
# Redirected. Reissue the request to the new location.
url = resp.headers['location']
redirects -= 1
continue
elif resp.status_code != 200:
msg = "Request returned failure status: %s (%s)"
err = msg % (resp.status_code, resp.text)
raise CommunicationError(url, err)
return self._get_reply(url, resp)
raise CommunicationError(url, "Too many redirections, giving up!")
def get_ticket(self, source, target, crypto, key):
# prepare metadata
md = {'requestor': source,
'target': target,
'timestamp': time.time(),
'nonce': struct.unpack('Q', os.urandom(8))[0]}
metadata = base64.b64encode(jsonutils.dumps(md))
# sign metadata
signature = crypto.sign(key, metadata)
# HTTP request
reply = self._get_ticket({'metadata': metadata,
'signature': signature})
# verify reply
signature = crypto.sign(key, (reply['metadata'] + reply['ticket']))
if signature != reply['signature']:
raise InvalidEncryptedTicket(md['source'], md['destination'])
md = jsonutils.loads(base64.b64decode(reply['metadata']))
if ((md['source'] != source or
md['destination'] != target or
md['expiration'] < time.time())):
raise InvalidEncryptedTicket(md['source'], md['destination'])
# return ticket data
tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket']))
return tkt, md['expiration']
# we need to keep a global nonce, as this value should never repeat non
# matter how many SecureMessage objects we create
_NONCE = None
def _get_nonce():
"""We keep a single counter per instance, as it is so huge we can't
possibly cycle through within 1/100 of a second anyway.
"""
global _NONCE
# Lazy initialize, for now get a random value, multiply by 2^32 and
# use it as the nonce base. The counter itself will rotate after
# 2^32 increments.
if _NONCE is None:
_NONCE = [struct.unpack('I', os.urandom(4))[0], 0]
# Increment counter and wrap at 2^32
_NONCE[1] += 1
if _NONCE[1] > 0xffffffff:
_NONCE[1] = 0
# Return base + counter
return long((_NONCE[0] * 0xffffffff)) + _NONCE[1]
class SecureMessage(object):
"""A Secure Message object.
This class creates a signing/encryption facility for RPC messages.
It encapsulates all the necessary crypto primitives to insulate
regular code from the intricacies of message authentication, validation
and optionally encryption.
:param topic: The topic name of the queue
:param host: The server name, together with the topic it forms a unique
name that is used to source signing keys, and verify
incoming messages.
:param conf: a ConfigOpts object
:param key: (optional) explicitly pass in endpoint private key.
If not provided it will be sourced from the service config
:param key_store: (optional) Storage class for local caching
:param encrypt: (defaults to False) Whether to encrypt messages
:param enctype: (defaults to AES) Cipher to use
:param hashtype: (defaults to SHA256) Hash function to use for signatures
"""
def __init__(self, topic, host, conf, key=None, key_store=None,
encrypt=None, enctype='AES', hashtype='SHA256'):
conf.register_group(secure_message_group)
conf.register_opts(secure_message_opts, group='secure_messages')
self._name = '%s.%s' % (topic, host)
self._key = key
self._conf = conf.secure_messages
self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt
self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype)
self._hkdf = cryptoutils.HKDF(hashtype)
self._kds = _KDSClient(self._conf.kds_endpoint)
if self._key is None:
self._key = self._init_key(topic, self._name)
if self._key is None:
err = "Secret Key (or key file) is missing or malformed"
raise SharedKeyNotFound(self._name, err)
self._key_store = key_store or _KEY_STORE
def _init_key(self, topic, name):
keys = None
if self._conf.secret_keys_file:
with open(self._conf.secret_keys_file, 'r') as f:
keys = f.readlines()
elif self._conf.secret_key:
keys = self._conf.secret_key
if keys is None:
return None
for k in keys:
if k[0] == '#':
continue
if ':' not in k:
break
svc, key = k.split(':', 1)
if svc == topic or svc == name:
return base64.b64decode(key)
return None
def _split_key(self, key, size):
sig_key = key[:size]
enc_key = key[size:]
return sig_key, enc_key
def _decode_esek(self, key, source, target, timestamp, esek):
"""This function decrypts the esek buffer passed in and returns a
KeyStore to be used to check and decrypt the received message.
:param key: The key to use to decrypt the ticket (esek)
:param source: The name of the source service
:param traget: The name of the target service
:param timestamp: The incoming message timestamp
:param esek: a base64 encoded encrypted block containing a JSON string
"""
rkey = None
try:
s = self._crypto.decrypt(key, esek)
j = jsonutils.loads(s)
rkey = base64.b64decode(j['key'])
expiration = j['timestamp'] + j['ttl']
if j['timestamp'] > timestamp or timestamp > expiration:
raise InvalidExpiredTicket(source, target)
except Exception:
raise InvalidEncryptedTicket(source, target)
info = '%s,%s,%s' % (source, target, str(j['timestamp']))
sek = self._hkdf.expand(rkey, info, len(key) * 2)
return self._split_key(sek, len(key))
def _get_ticket(self, target):
"""This function will check if we already have a SEK for the specified
target in the cache, or will go and try to fetch a new SEK from the key
server.
:param target: The name of the target service
"""
ticket = self._key_store.get_ticket(self._name, target)
if ticket is not None:
return ticket
tkt, expiration = self._kds.get_ticket(self._name, target,
self._crypto, self._key)
self._key_store.put_ticket(self._name, target,
base64.b64decode(tkt['skey']),
base64.b64decode(tkt['ekey']),
tkt['esek'], expiration)
return self._key_store.get_ticket(self._name, target)
def encode(self, version, target, json_msg):
"""This is the main encoding function.
It takes a target and a message and returns a tuple consisting of a
JSON serialized metadata object, a JSON serialized (and optionally
encrypted) message, and a signature.
:param version: the current envelope version
:param target: The name of the target service (usually with hostname)
:param json_msg: a serialized json message object
"""
ticket = self._get_ticket(target)
metadata = jsonutils.dumps({'source': self._name,
'destination': target,
'timestamp': time.time(),
'nonce': _get_nonce(),
'esek': ticket.esek,
'encryption': self._encrypt})
message = json_msg
if self._encrypt:
message = self._crypto.encrypt(ticket.ekey, message)
signature = self._crypto.sign(ticket.skey,
version + metadata + message)
return (metadata, message, signature)
def decode(self, version, metadata, message, signature):
"""This is the main decoding function.
It takes a version, metadata, message and signature strings and
returns a tuple with a (decrypted) message and metadata or raises
an exception in case of error.
:param version: the current envelope version
:param metadata: a JSON serialized object with metadata for validation
:param message: a JSON serialized (base64 encoded encrypted) message
:param signature: a base64 encoded signature
"""
md = jsonutils.loads(metadata)
check_args = ('source', 'destination', 'timestamp',
'nonce', 'esek', 'encryption')
for arg in check_args:
if arg not in md:
raise InvalidMetadata('Missing metadata "%s"' % arg)
if md['destination'] != self._name:
# TODO(simo) handle group keys by checking target
raise UnknownDestinationName(md['destination'])
try:
skey, ekey = self._decode_esek(self._key,
md['source'], md['destination'],
md['timestamp'], md['esek'])
except InvalidExpiredTicket:
raise
except Exception:
raise InvalidMetadata('Failed to decode ESEK for %s/%s' % (
md['source'], md['destination']))
sig = self._crypto.sign(skey, version + metadata + message)
if sig != signature:
raise InvalidSignature(md['source'], md['destination'])
if md['encryption'] is True:
msg = self._crypto.decrypt(ekey, message)
else:
msg = message
return (md, msg)

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import log as logging
from trove.openstack.common import rpc
from trove.openstack.common.rpc import dispatcher as rpc_dispatcher
@ -32,10 +32,11 @@ class Service(service.Service):
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None):
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
@ -48,7 +49,8 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)

1
trove/openstack/common/rpc/zmq_receiver.py Executable file → Normal file
View File

@ -1,4 +1,3 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation

View File

@ -32,7 +32,7 @@ import logging as std_logging
from oslo.config import cfg
from trove.openstack.common import eventlet_backdoor
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
from trove.openstack.common import importutils
from trove.openstack.common import log as logging
from trove.openstack.common import threadgroup
@ -81,6 +81,15 @@ class Launcher(object):
"""
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
@ -93,31 +102,51 @@ class ServiceLauncher(Launcher):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signo)
def wait(self):
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
self.stop()
if rpc:
rpc.cleanup()
return status
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
def wait(self):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
return status
self.restart()
class ServiceWrapper(object):
@ -135,9 +164,12 @@ class ProcessLauncher(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
@ -146,6 +178,7 @@ class ProcessLauncher(object):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -156,16 +189,47 @@ class ProcessLauncher(object):
sys.exit(1)
def _child_process(self, service):
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = None
signo = 0
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
@ -180,7 +244,7 @@ class ProcessLauncher(object):
launcher = Launcher()
launcher.launch_service(service)
launcher.wait()
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@ -201,21 +265,13 @@ class ProcessLauncher(object):
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.service)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if signo != signal.SIGHUP:
break
launcher.restart()
os._exit(status)
@ -261,12 +317,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
@ -275,14 +326,30 @@ class ProcessLauncher(object):
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
@ -307,13 +374,19 @@ class Service(object):
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
def stop(self):
self.tg.stop()
self.tg.wait()
self._done.send()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
@ -336,9 +409,10 @@ class Services(object):
service.stop()
service.wait()
# each service has performed cleanup, now signal that the run_service
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
self.done.send()
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
@ -346,6 +420,13 @@ class Services(object):
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.

View File

@ -19,7 +19,7 @@ import ssl
from oslo.config import cfg
from trove.openstack.common.gettextutils import _
from trove.openstack.common.gettextutils import _ # noqa
ssl_opts = [
@ -78,3 +78,23 @@ def wrap(sock):
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23,
"sslv3": ssl.PROTOCOL_SSLv3
}
try:
_SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
def validate_ssl_version(version):
key = version.lower()
try:
return _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(_("Invalid SSL version : %s") % version)

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from eventlet import greenlet
import eventlet
from eventlet import greenpool
from eventlet import greenthread
@ -105,7 +105,7 @@ class ThreadGroup(object):
for x in self.timers:
try:
x.wait()
except greenlet.GreenletExit:
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
@ -115,7 +115,7 @@ class ThreadGroup(object):
continue
try:
x.wait()
except greenlet.GreenletExit:
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)

View File

@ -49,9 +49,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
except TypeError as e:
raise ValueError(e.message)
raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):