Synced rpc and gettextutils modules from oslo-incubator
The main reason for sync is to get the following oslo-rpc fixes in Neutron: * I537015f452eb770acba41fdedfe221628f52a920 (reduces delays when reconnecting to Qpid in HA deployments) * Ia148baa6e1ec632789ac3621c85173c2c16f3918 (fixed HA failover, Qpid part) * I67923cb024bbd143edc8edccf35b9b400df31eb3 (fixed HA failover, RabbitMQ part) Latest oslo-incubator commit at the moment of sync: * 2eab986ef3c43f8d1e25065e3cbc1307860c25c7 Change-Id: I2f5bb0d195e050f755ecdbf06a6bbed587a04fbe Closes-Bug: 1281148 Closes-Bug: 1261631
This commit is contained in:
parent
174825c549
commit
52fefa7839
@ -0,0 +1,17 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
|
||||||
|
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
|
@ -23,11 +23,11 @@ Usual usage in an openstack.common module:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import functools
|
||||||
import gettext
|
import gettext
|
||||||
import locale
|
import locale
|
||||||
from logging import handlers
|
from logging import handlers
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
|
|
||||||
from babel import localedata
|
from babel import localedata
|
||||||
import six
|
import six
|
||||||
@ -35,6 +35,17 @@ import six
|
|||||||
_localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
|
_localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
|
||||||
_t = gettext.translation('neutron', localedir=_localedir, fallback=True)
|
_t = gettext.translation('neutron', localedir=_localedir, fallback=True)
|
||||||
|
|
||||||
|
# We use separate translation catalogs for each log level, so set up a
|
||||||
|
# mapping between the log level name and the translator. The domain
|
||||||
|
# for the log level is project_name + "-log-" + log_level so messages
|
||||||
|
# for each level end up in their own catalog.
|
||||||
|
_t_log_levels = dict(
|
||||||
|
(level, gettext.translation('neutron' + '-log-' + level,
|
||||||
|
localedir=_localedir,
|
||||||
|
fallback=True))
|
||||||
|
for level in ['info', 'warning', 'error', 'critical']
|
||||||
|
)
|
||||||
|
|
||||||
_AVAILABLE_LANGUAGES = {}
|
_AVAILABLE_LANGUAGES = {}
|
||||||
USE_LAZY = False
|
USE_LAZY = False
|
||||||
|
|
||||||
@ -60,6 +71,28 @@ def _(msg):
|
|||||||
return _t.ugettext(msg)
|
return _t.ugettext(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _log_translation(msg, level):
|
||||||
|
"""Build a single translation of a log message
|
||||||
|
"""
|
||||||
|
if USE_LAZY:
|
||||||
|
return Message(msg, domain='neutron' + '-log-' + level)
|
||||||
|
else:
|
||||||
|
translator = _t_log_levels[level]
|
||||||
|
if six.PY3:
|
||||||
|
return translator.gettext(msg)
|
||||||
|
return translator.ugettext(msg)
|
||||||
|
|
||||||
|
# Translators for log levels.
|
||||||
|
#
|
||||||
|
# The abbreviated names are meant to reflect the usual use of a short
|
||||||
|
# name like '_'. The "L" is for "log" and the other letter comes from
|
||||||
|
# the level.
|
||||||
|
_LI = functools.partial(_log_translation, level='info')
|
||||||
|
_LW = functools.partial(_log_translation, level='warning')
|
||||||
|
_LE = functools.partial(_log_translation, level='error')
|
||||||
|
_LC = functools.partial(_log_translation, level='critical')
|
||||||
|
|
||||||
|
|
||||||
def install(domain, lazy=False):
|
def install(domain, lazy=False):
|
||||||
"""Install a _() function using the given translation domain.
|
"""Install a _() function using the given translation domain.
|
||||||
|
|
||||||
@ -118,7 +151,8 @@ class Message(six.text_type):
|
|||||||
and can be treated as such.
|
and can be treated as such.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
|
def __new__(cls, msgid, msgtext=None, params=None,
|
||||||
|
domain='neutron', *args):
|
||||||
"""Create a new Message object.
|
"""Create a new Message object.
|
||||||
|
|
||||||
In order for translation to work gettext requires a message ID, this
|
In order for translation to work gettext requires a message ID, this
|
||||||
@ -213,47 +247,22 @@ class Message(six.text_type):
|
|||||||
if other is None:
|
if other is None:
|
||||||
params = (other,)
|
params = (other,)
|
||||||
elif isinstance(other, dict):
|
elif isinstance(other, dict):
|
||||||
params = self._trim_dictionary_parameters(other)
|
# Merge the dictionaries
|
||||||
|
# Copy each item in case one does not support deep copy.
|
||||||
|
params = {}
|
||||||
|
if isinstance(self.params, dict):
|
||||||
|
for key, val in self.params.items():
|
||||||
|
params[key] = self._copy_param(val)
|
||||||
|
for key, val in other.items():
|
||||||
|
params[key] = self._copy_param(val)
|
||||||
else:
|
else:
|
||||||
params = self._copy_param(other)
|
params = self._copy_param(other)
|
||||||
return params
|
return params
|
||||||
|
|
||||||
def _trim_dictionary_parameters(self, dict_param):
|
|
||||||
"""Return a dict that only has matching entries in the msgid."""
|
|
||||||
# NOTE(luisg): Here we trim down the dictionary passed as parameters
|
|
||||||
# to avoid carrying a lot of unnecessary weight around in the message
|
|
||||||
# object, for example if someone passes in Message() % locals() but
|
|
||||||
# only some params are used, and additionally we prevent errors for
|
|
||||||
# non-deepcopyable objects by unicoding() them.
|
|
||||||
|
|
||||||
# Look for %(param) keys in msgid;
|
|
||||||
# Skip %% and deal with the case where % is first character on the line
|
|
||||||
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
|
|
||||||
|
|
||||||
# If we don't find any %(param) keys but have a %s
|
|
||||||
if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
|
|
||||||
# Apparently the full dictionary is the parameter
|
|
||||||
params = self._copy_param(dict_param)
|
|
||||||
else:
|
|
||||||
params = {}
|
|
||||||
# Save our existing parameters as defaults to protect
|
|
||||||
# ourselves from losing values if we are called through an
|
|
||||||
# (erroneous) chain that builds a valid Message with
|
|
||||||
# arguments, and then does something like "msg % kwds"
|
|
||||||
# where kwds is an empty dictionary.
|
|
||||||
src = {}
|
|
||||||
if isinstance(self.params, dict):
|
|
||||||
src.update(self.params)
|
|
||||||
src.update(dict_param)
|
|
||||||
for key in keys:
|
|
||||||
params[key] = self._copy_param(src[key])
|
|
||||||
|
|
||||||
return params
|
|
||||||
|
|
||||||
def _copy_param(self, param):
|
def _copy_param(self, param):
|
||||||
try:
|
try:
|
||||||
return copy.deepcopy(param)
|
return copy.deepcopy(param)
|
||||||
except TypeError:
|
except Exception:
|
||||||
# Fallback to casting to unicode this will handle the
|
# Fallback to casting to unicode this will handle the
|
||||||
# python code-like objects that can't be deep-copied
|
# python code-like objects that can't be deep-copied
|
||||||
return six.text_type(param)
|
return six.text_type(param)
|
||||||
@ -297,9 +306,27 @@ def get_available_languages(domain):
|
|||||||
list_identifiers = (getattr(localedata, 'list', None) or
|
list_identifiers = (getattr(localedata, 'list', None) or
|
||||||
getattr(localedata, 'locale_identifiers'))
|
getattr(localedata, 'locale_identifiers'))
|
||||||
locale_identifiers = list_identifiers()
|
locale_identifiers = list_identifiers()
|
||||||
|
|
||||||
for i in locale_identifiers:
|
for i in locale_identifiers:
|
||||||
if find(i) is not None:
|
if find(i) is not None:
|
||||||
language_list.append(i)
|
language_list.append(i)
|
||||||
|
|
||||||
|
# NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
|
||||||
|
# locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
|
||||||
|
# are perfectly legitimate locales:
|
||||||
|
# https://github.com/mitsuhiko/babel/issues/37
|
||||||
|
# In Babel 1.3 they fixed the bug and they support these locales, but
|
||||||
|
# they are still not explicitly "listed" by locale_identifiers().
|
||||||
|
# That is why we add the locales here explicitly if necessary so that
|
||||||
|
# they are listed as supported.
|
||||||
|
aliases = {'zh': 'zh_CN',
|
||||||
|
'zh_Hant_HK': 'zh_HK',
|
||||||
|
'zh_Hant': 'zh_TW',
|
||||||
|
'fil': 'tl_PH'}
|
||||||
|
for (locale, alias) in six.iteritems(aliases):
|
||||||
|
if locale in language_list and alias not in language_list:
|
||||||
|
language_list.append(alias)
|
||||||
|
|
||||||
_AVAILABLE_LANGUAGES[domain] = language_list
|
_AVAILABLE_LANGUAGES[domain] = language_list
|
||||||
return copy.copy(language_list)
|
return copy.copy(language_list)
|
||||||
|
|
||||||
|
@ -23,13 +23,9 @@ For some wrappers that add message versioning to rpc, see:
|
|||||||
rpc.proxy
|
rpc.proxy
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import inspect
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import local
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
@ -93,24 +89,7 @@ def create_connection(new=True):
|
|||||||
return _get_impl().create_connection(CONF, new=new)
|
return _get_impl().create_connection(CONF, new=new)
|
||||||
|
|
||||||
|
|
||||||
def _check_for_lock():
|
def call(context, topic, msg, timeout=None):
|
||||||
if not CONF.debug:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if ((hasattr(local.strong_store, 'locks_held')
|
|
||||||
and local.strong_store.locks_held)):
|
|
||||||
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
|
|
||||||
LOG.warn(_('A RPC is being made while holding a lock. The locks '
|
|
||||||
'currently held are %(locks)s. This is probably a bug. '
|
|
||||||
'Please report it. Include the following: [%(stack)s].'),
|
|
||||||
{'locks': local.strong_store.locks_held,
|
|
||||||
'stack': stack})
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg, timeout=None, check_for_lock=False):
|
|
||||||
"""Invoke a remote method that returns something.
|
"""Invoke a remote method that returns something.
|
||||||
|
|
||||||
:param context: Information that identifies the user that has made this
|
:param context: Information that identifies the user that has made this
|
||||||
@ -124,16 +103,12 @@ def call(context, topic, msg, timeout=None, check_for_lock=False):
|
|||||||
"args" : dict_of_kwargs }
|
"args" : dict_of_kwargs }
|
||||||
:param timeout: int, number of seconds to use for a response timeout.
|
:param timeout: int, number of seconds to use for a response timeout.
|
||||||
If set, this overrides the rpc_response_timeout option.
|
If set, this overrides the rpc_response_timeout option.
|
||||||
:param check_for_lock: if True, a warning is emitted if a RPC call is made
|
|
||||||
with a lock held.
|
|
||||||
|
|
||||||
:returns: A dict from the remote method.
|
:returns: A dict from the remote method.
|
||||||
|
|
||||||
:raises: openstack.common.rpc.common.Timeout if a complete response
|
:raises: openstack.common.rpc.common.Timeout if a complete response
|
||||||
is not received before the timeout is reached.
|
is not received before the timeout is reached.
|
||||||
"""
|
"""
|
||||||
if check_for_lock:
|
|
||||||
_check_for_lock()
|
|
||||||
return _get_impl().call(CONF, context, topic, msg, timeout)
|
return _get_impl().call(CONF, context, topic, msg, timeout)
|
||||||
|
|
||||||
|
|
||||||
@ -176,7 +151,7 @@ def fanout_cast(context, topic, msg):
|
|||||||
return _get_impl().fanout_cast(CONF, context, topic, msg)
|
return _get_impl().fanout_cast(CONF, context, topic, msg)
|
||||||
|
|
||||||
|
|
||||||
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
|
def multicall(context, topic, msg, timeout=None):
|
||||||
"""Invoke a remote method and get back an iterator.
|
"""Invoke a remote method and get back an iterator.
|
||||||
|
|
||||||
In this case, the remote method will be returning multiple values in
|
In this case, the remote method will be returning multiple values in
|
||||||
@ -194,8 +169,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
|
|||||||
"args" : dict_of_kwargs }
|
"args" : dict_of_kwargs }
|
||||||
:param timeout: int, number of seconds to use for a response timeout.
|
:param timeout: int, number of seconds to use for a response timeout.
|
||||||
If set, this overrides the rpc_response_timeout option.
|
If set, this overrides the rpc_response_timeout option.
|
||||||
:param check_for_lock: if True, a warning is emitted if a RPC call is made
|
|
||||||
with a lock held.
|
|
||||||
|
|
||||||
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
|
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
|
||||||
an index that starts at 0 and increases by one for each value
|
an index that starts at 0 and increases by one for each value
|
||||||
@ -205,8 +178,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
|
|||||||
:raises: openstack.common.rpc.common.Timeout if a complete response
|
:raises: openstack.common.rpc.common.Timeout if a complete response
|
||||||
is not received before the timeout is reached.
|
is not received before the timeout is reached.
|
||||||
"""
|
"""
|
||||||
if check_for_lock:
|
|
||||||
_check_for_lock()
|
|
||||||
return _get_impl().multicall(CONF, context, topic, msg, timeout)
|
return _get_impl().multicall(CONF, context, topic, msg, timeout)
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ import six
|
|||||||
|
|
||||||
|
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common import excutils
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LE
|
||||||
from neutron.openstack.common import local
|
from neutron.openstack.common import local
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common.rpc import common as rpc_common
|
from neutron.openstack.common.rpc import common as rpc_common
|
||||||
@ -72,7 +72,7 @@ class Pool(pools.Pool):
|
|||||||
|
|
||||||
# TODO(comstud): Timeout connections not used in a while
|
# TODO(comstud): Timeout connections not used in a while
|
||||||
def create(self):
|
def create(self):
|
||||||
LOG.debug(_('Pool creating new connection'))
|
LOG.debug('Pool creating new connection')
|
||||||
return self.connection_cls(self.conf)
|
return self.connection_cls(self.conf)
|
||||||
|
|
||||||
def empty(self):
|
def empty(self):
|
||||||
@ -287,7 +287,7 @@ def unpack_context(conf, msg):
|
|||||||
context_dict['reply_q'] = msg.pop('_reply_q', None)
|
context_dict['reply_q'] = msg.pop('_reply_q', None)
|
||||||
context_dict['conf'] = conf
|
context_dict['conf'] = conf
|
||||||
ctx = RpcContext.from_dict(context_dict)
|
ctx = RpcContext.from_dict(context_dict)
|
||||||
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
|
rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
@ -339,7 +339,7 @@ def _add_unique_id(msg):
|
|||||||
"""Add unique_id for checking duplicate messages."""
|
"""Add unique_id for checking duplicate messages."""
|
||||||
unique_id = uuid.uuid4().hex
|
unique_id = uuid.uuid4().hex
|
||||||
msg.update({UNIQUE_ID: unique_id})
|
msg.update({UNIQUE_ID: unique_id})
|
||||||
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
|
LOG.debug('UNIQUE_ID is %s.' % (unique_id))
|
||||||
|
|
||||||
|
|
||||||
class _ThreadPoolWithWait(object):
|
class _ThreadPoolWithWait(object):
|
||||||
@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
# the previous context is stored in local.store.context
|
# the previous context is stored in local.store.context
|
||||||
if hasattr(local.store, 'context'):
|
if hasattr(local.store, 'context'):
|
||||||
del local.store.context
|
del local.store.context
|
||||||
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
rpc_common._safe_log(LOG.debug, 'received %s', message_data)
|
||||||
self.msg_id_cache.check_duplicate_message(message_data)
|
self.msg_id_cache.check_duplicate_message(message_data)
|
||||||
ctxt = unpack_context(self.conf, message_data)
|
ctxt = unpack_context(self.conf, message_data)
|
||||||
method = message_data.get('method')
|
method = message_data.get('method')
|
||||||
@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
# This final None tells multicall that it is done.
|
# This final None tells multicall that it is done.
|
||||||
ctxt.reply(ending=True, connection_pool=self.connection_pool)
|
ctxt.reply(ending=True, connection_pool=self.connection_pool)
|
||||||
except rpc_common.ClientException as e:
|
except rpc_common.ClientException as e:
|
||||||
LOG.debug(_('Expected exception during message handling (%s)') %
|
LOG.debug('Expected exception during message handling (%s)' %
|
||||||
e._exc_info[1])
|
e._exc_info[1])
|
||||||
ctxt.reply(None, e._exc_info,
|
ctxt.reply(None, e._exc_info,
|
||||||
connection_pool=self.connection_pool,
|
connection_pool=self.connection_pool,
|
||||||
@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
except Exception:
|
except Exception:
|
||||||
# sys.exc_info() is deleted by LOG.exception().
|
# sys.exc_info() is deleted by LOG.exception().
|
||||||
exc_info = sys.exc_info()
|
exc_info = sys.exc_info()
|
||||||
LOG.error(_('Exception during message handling'),
|
LOG.error(_LE('Exception during message handling'),
|
||||||
exc_info=exc_info)
|
exc_info=exc_info)
|
||||||
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
|
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
|
||||||
|
|
||||||
@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore()
|
|||||||
|
|
||||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||||
"""Make a call that returns multiple times."""
|
"""Make a call that returns multiple times."""
|
||||||
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
LOG.debug('Making synchronous call on %s ...', topic)
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
msg.update({'_msg_id': msg_id})
|
msg.update({'_msg_id': msg_id})
|
||||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
LOG.debug('MSG_ID is %s' % (msg_id))
|
||||||
_add_unique_id(msg)
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
|
|
||||||
@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
|
|||||||
|
|
||||||
def cast(conf, context, topic, msg, connection_pool):
|
def cast(conf, context, topic, msg, connection_pool):
|
||||||
"""Sends a message on a topic without waiting for a response."""
|
"""Sends a message on a topic without waiting for a response."""
|
||||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
LOG.debug('Making asynchronous cast on %s...', topic)
|
||||||
_add_unique_id(msg)
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool):
|
|||||||
|
|
||||||
def fanout_cast(conf, context, topic, msg, connection_pool):
|
def fanout_cast(conf, context, topic, msg, connection_pool):
|
||||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
LOG.debug('Making asynchronous fanout cast...')
|
||||||
_add_unique_id(msg)
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
|||||||
|
|
||||||
def notify(conf, context, topic, msg, connection_pool, envelope):
|
def notify(conf, context, topic, msg, connection_pool, envelope):
|
||||||
"""Sends a notification event on a topic."""
|
"""Sends a notification event on a topic."""
|
||||||
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
|
LOG.debug('Sending %(event_type)s on %(topic)s',
|
||||||
dict(event_type=msg.get('event_type'),
|
dict(event_type=msg.get('event_type'),
|
||||||
topic=topic))
|
topic=topic))
|
||||||
_add_unique_id(msg)
|
_add_unique_id(msg)
|
||||||
|
@ -22,7 +22,7 @@ import traceback
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LE
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common import local
|
from neutron.openstack.common import local
|
||||||
@ -85,7 +85,7 @@ class RPCException(Exception):
|
|||||||
except Exception:
|
except Exception:
|
||||||
# kwargs doesn't match a variable in the message
|
# kwargs doesn't match a variable in the message
|
||||||
# log the issue and the kwargs
|
# log the issue and the kwargs
|
||||||
LOG.exception(_('Exception in string format operation'))
|
LOG.exception(_LE('Exception in string format operation'))
|
||||||
for name, value in six.iteritems(kwargs):
|
for name, value in six.iteritems(kwargs):
|
||||||
LOG.error("%s: %s" % (name, value))
|
LOG.error("%s: %s" % (name, value))
|
||||||
# at least get the core message out if something happened
|
# at least get the core message out if something happened
|
||||||
@ -269,6 +269,10 @@ def _safe_log(log_func, msg, msg_data):
|
|||||||
d[k] = '<SANITIZED>'
|
d[k] = '<SANITIZED>'
|
||||||
elif k.lower() in SANITIZE:
|
elif k.lower() in SANITIZE:
|
||||||
d[k] = '<SANITIZED>'
|
d[k] = '<SANITIZED>'
|
||||||
|
elif isinstance(d[k], list):
|
||||||
|
for e in d[k]:
|
||||||
|
if isinstance(e, dict):
|
||||||
|
_fix_passwords(e)
|
||||||
elif isinstance(d[k], dict):
|
elif isinstance(d[k], dict):
|
||||||
_fix_passwords(d[k])
|
_fix_passwords(d[k])
|
||||||
return d
|
return d
|
||||||
@ -285,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
|
|||||||
tb = traceback.format_exception(*failure_info)
|
tb = traceback.format_exception(*failure_info)
|
||||||
failure = failure_info[1]
|
failure = failure_info[1]
|
||||||
if log_failure:
|
if log_failure:
|
||||||
LOG.error(_("Returning exception %s to caller"),
|
LOG.error(_LE("Returning exception %s to caller"),
|
||||||
six.text_type(failure))
|
six.text_type(failure))
|
||||||
LOG.error(tb)
|
LOG.error(tb)
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""Fake RPC implementation which calls proxy methods directly with no
|
"""Fake RPC implementation which calls proxy methods directly with no
|
||||||
queues. Casts will block, but this is very useful for tests.
|
queues. Casts will block, but this is very useful for tests.
|
||||||
"""
|
"""
|
||||||
@ -139,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None):
|
|||||||
if not method:
|
if not method:
|
||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version')
|
||||||
namespace = msg.get('namespace', None)
|
namespace = msg.get('namespace')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
consumer = CONSUMERS[topic][0]
|
consumer = CONSUMERS[topic][0]
|
||||||
@ -184,8 +185,8 @@ def fanout_cast(conf, context, topic, msg):
|
|||||||
if not method:
|
if not method:
|
||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version')
|
||||||
namespace = msg.get('namespace', None)
|
namespace = msg.get('namespace')
|
||||||
|
|
||||||
for consumer in CONSUMERS.get(topic, []):
|
for consumer in CONSUMERS.get(topic, []):
|
||||||
try:
|
try:
|
||||||
|
@ -29,7 +29,7 @@ from oslo.config import cfg
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common import excutils
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LE, _LI
|
||||||
from neutron.openstack.common import network_utils
|
from neutron.openstack.common import network_utils
|
||||||
from neutron.openstack.common.rpc import amqp as rpc_amqp
|
from neutron.openstack.common.rpc import amqp as rpc_amqp
|
||||||
from neutron.openstack.common.rpc import common as rpc_common
|
from neutron.openstack.common.rpc import common as rpc_common
|
||||||
@ -38,9 +38,9 @@ from neutron.openstack.common import sslutils
|
|||||||
kombu_opts = [
|
kombu_opts = [
|
||||||
cfg.StrOpt('kombu_ssl_version',
|
cfg.StrOpt('kombu_ssl_version',
|
||||||
default='',
|
default='',
|
||||||
help='SSL version to use (valid only if SSL enabled). '
|
help='If SSL is enabled, the SSL version to use. Valid '
|
||||||
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
|
'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
|
||||||
'be available on some distributions'
|
'be available on some distributions.'
|
||||||
),
|
),
|
||||||
cfg.StrOpt('kombu_ssl_keyfile',
|
cfg.StrOpt('kombu_ssl_keyfile',
|
||||||
default='',
|
default='',
|
||||||
@ -63,33 +63,33 @@ kombu_opts = [
|
|||||||
help='RabbitMQ HA cluster host:port pairs'),
|
help='RabbitMQ HA cluster host:port pairs'),
|
||||||
cfg.BoolOpt('rabbit_use_ssl',
|
cfg.BoolOpt('rabbit_use_ssl',
|
||||||
default=False,
|
default=False,
|
||||||
help='connect over SSL for RabbitMQ'),
|
help='Connect over SSL for RabbitMQ'),
|
||||||
cfg.StrOpt('rabbit_userid',
|
cfg.StrOpt('rabbit_userid',
|
||||||
default='guest',
|
default='guest',
|
||||||
help='the RabbitMQ userid'),
|
help='The RabbitMQ userid'),
|
||||||
cfg.StrOpt('rabbit_password',
|
cfg.StrOpt('rabbit_password',
|
||||||
default='guest',
|
default='guest',
|
||||||
help='the RabbitMQ password',
|
help='The RabbitMQ password',
|
||||||
secret=True),
|
secret=True),
|
||||||
cfg.StrOpt('rabbit_virtual_host',
|
cfg.StrOpt('rabbit_virtual_host',
|
||||||
default='/',
|
default='/',
|
||||||
help='the RabbitMQ virtual host'),
|
help='The RabbitMQ virtual host'),
|
||||||
cfg.IntOpt('rabbit_retry_interval',
|
cfg.IntOpt('rabbit_retry_interval',
|
||||||
default=1,
|
default=1,
|
||||||
help='how frequently to retry connecting with RabbitMQ'),
|
help='How frequently to retry connecting with RabbitMQ'),
|
||||||
cfg.IntOpt('rabbit_retry_backoff',
|
cfg.IntOpt('rabbit_retry_backoff',
|
||||||
default=2,
|
default=2,
|
||||||
help='how long to backoff for between retries when connecting '
|
help='How long to backoff for between retries when connecting '
|
||||||
'to RabbitMQ'),
|
'to RabbitMQ'),
|
||||||
cfg.IntOpt('rabbit_max_retries',
|
cfg.IntOpt('rabbit_max_retries',
|
||||||
default=0,
|
default=0,
|
||||||
help='maximum retries with trying to connect to RabbitMQ '
|
help='Maximum number of RabbitMQ connection retries. '
|
||||||
'(the default of 0 implies an infinite retry count)'),
|
'Default is 0 (infinite retry count)'),
|
||||||
cfg.BoolOpt('rabbit_ha_queues',
|
cfg.BoolOpt('rabbit_ha_queues',
|
||||||
default=False,
|
default=False,
|
||||||
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
|
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
|
||||||
'You need to wipe RabbitMQ database when '
|
'If you change this option, you must wipe the '
|
||||||
'changing this option.'),
|
'RabbitMQ database.'),
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -153,12 +153,12 @@ class ConsumerBase(object):
|
|||||||
callback(msg)
|
callback(msg)
|
||||||
except Exception:
|
except Exception:
|
||||||
if self.ack_on_error:
|
if self.ack_on_error:
|
||||||
LOG.exception(_("Failed to process message"
|
LOG.exception(_LE("Failed to process message"
|
||||||
" ... skipping it."))
|
" ... skipping it."))
|
||||||
message.ack()
|
message.ack()
|
||||||
else:
|
else:
|
||||||
LOG.exception(_("Failed to process message"
|
LOG.exception(_LE("Failed to process message"
|
||||||
" ... will requeue."))
|
" ... will requeue."))
|
||||||
message.requeue()
|
message.requeue()
|
||||||
else:
|
else:
|
||||||
message.ack()
|
message.ack()
|
||||||
@ -458,6 +458,9 @@ class Connection(object):
|
|||||||
|
|
||||||
self.params_list = params_list
|
self.params_list = params_list
|
||||||
|
|
||||||
|
brokers_count = len(self.params_list)
|
||||||
|
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||||
|
|
||||||
self.memory_transport = self.conf.fake_rabbit
|
self.memory_transport = self.conf.fake_rabbit
|
||||||
|
|
||||||
self.connection = None
|
self.connection = None
|
||||||
@ -492,7 +495,7 @@ class Connection(object):
|
|||||||
be handled by the caller.
|
be handled by the caller.
|
||||||
"""
|
"""
|
||||||
if self.connection:
|
if self.connection:
|
||||||
LOG.info(_("Reconnecting to AMQP server on "
|
LOG.info(_LI("Reconnecting to AMQP server on "
|
||||||
"%(hostname)s:%(port)d") % params)
|
"%(hostname)s:%(port)d") % params)
|
||||||
try:
|
try:
|
||||||
self.connection.release()
|
self.connection.release()
|
||||||
@ -514,7 +517,7 @@ class Connection(object):
|
|||||||
self.channel._new_queue('ae.undeliver')
|
self.channel._new_queue('ae.undeliver')
|
||||||
for consumer in self.consumers:
|
for consumer in self.consumers:
|
||||||
consumer.reconnect(self.channel)
|
consumer.reconnect(self.channel)
|
||||||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
|
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
|
||||||
params)
|
params)
|
||||||
|
|
||||||
def reconnect(self):
|
def reconnect(self):
|
||||||
@ -528,7 +531,7 @@ class Connection(object):
|
|||||||
|
|
||||||
attempt = 0
|
attempt = 0
|
||||||
while True:
|
while True:
|
||||||
params = self.params_list[attempt % len(self.params_list)]
|
params = self.params_list[next(self.next_broker_indices)]
|
||||||
attempt += 1
|
attempt += 1
|
||||||
try:
|
try:
|
||||||
self._connect(params)
|
self._connect(params)
|
||||||
@ -565,9 +568,9 @@ class Connection(object):
|
|||||||
sleep_time = min(sleep_time, self.interval_max)
|
sleep_time = min(sleep_time, self.interval_max)
|
||||||
|
|
||||||
log_info['sleep_time'] = sleep_time
|
log_info['sleep_time'] = sleep_time
|
||||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
|
||||||
'unreachable: %(err_str)s. Trying again in '
|
'unreachable: %(err_str)s. Trying again in '
|
||||||
'%(sleep_time)d seconds.') % log_info)
|
'%(sleep_time)d seconds.') % log_info)
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
def ensure(self, error_callback, method, *args, **kwargs):
|
def ensure(self, error_callback, method, *args, **kwargs):
|
||||||
@ -619,7 +622,7 @@ class Connection(object):
|
|||||||
|
|
||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s") % log_info)
|
"%(err_str)s") % log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
@ -637,11 +640,11 @@ class Connection(object):
|
|||||||
|
|
||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
if isinstance(exc, socket.timeout):
|
if isinstance(exc, socket.timeout):
|
||||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
LOG.debug('Timed out waiting for RPC response: %s' %
|
||||||
str(exc))
|
str(exc))
|
||||||
raise rpc_common.Timeout()
|
raise rpc_common.Timeout()
|
||||||
else:
|
else:
|
||||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
LOG.exception(_LE('Failed to consume message from queue: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
info['do_consume'] = True
|
info['do_consume'] = True
|
||||||
|
|
||||||
@ -680,7 +683,7 @@ class Connection(object):
|
|||||||
|
|
||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.exception(_("Failed to publish message to topic "
|
LOG.exception(_LE("Failed to publish message to topic "
|
||||||
"'%(topic)s': %(err_str)s") % log_info)
|
"'%(topic)s': %(err_str)s") % log_info)
|
||||||
|
|
||||||
def _publish():
|
def _publish():
|
||||||
|
@ -23,7 +23,7 @@ from oslo.config import cfg
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common import excutils
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LE, _LI
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
@ -188,7 +188,7 @@ class ConsumerBase(object):
|
|||||||
msg = rpc_common.deserialize_msg(message.content)
|
msg = rpc_common.deserialize_msg(message.content)
|
||||||
self.callback(msg)
|
self.callback(msg)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to process message... skipping it."))
|
LOG.exception(_LE("Failed to process message... skipping it."))
|
||||||
finally:
|
finally:
|
||||||
# TODO(sandy): Need support for optional ack_on_error.
|
# TODO(sandy): Need support for optional ack_on_error.
|
||||||
self.session.acknowledge(message)
|
self.session.acknowledge(message)
|
||||||
@ -467,6 +467,10 @@ class Connection(object):
|
|||||||
self.brokers = params['qpid_hosts']
|
self.brokers = params['qpid_hosts']
|
||||||
self.username = params['username']
|
self.username = params['username']
|
||||||
self.password = params['password']
|
self.password = params['password']
|
||||||
|
|
||||||
|
brokers_count = len(self.brokers)
|
||||||
|
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||||
|
|
||||||
self.connection_create(self.brokers[0])
|
self.connection_create(self.brokers[0])
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
@ -494,7 +498,6 @@ class Connection(object):
|
|||||||
|
|
||||||
def reconnect(self):
|
def reconnect(self):
|
||||||
"""Handles reconnecting and re-establishing sessions and queues."""
|
"""Handles reconnecting and re-establishing sessions and queues."""
|
||||||
attempt = 0
|
|
||||||
delay = 1
|
delay = 1
|
||||||
while True:
|
while True:
|
||||||
# Close the session if necessary
|
# Close the session if necessary
|
||||||
@ -504,21 +507,20 @@ class Connection(object):
|
|||||||
except qpid_exceptions.ConnectionError:
|
except qpid_exceptions.ConnectionError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
broker = self.brokers[attempt % len(self.brokers)]
|
broker = self.brokers[next(self.next_broker_indices)]
|
||||||
attempt += 1
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.connection_create(broker)
|
self.connection_create(broker)
|
||||||
self.connection.open()
|
self.connection.open()
|
||||||
except qpid_exceptions.ConnectionError as e:
|
except qpid_exceptions.ConnectionError as e:
|
||||||
msg_dict = dict(e=e, delay=delay)
|
msg_dict = dict(e=e, delay=delay)
|
||||||
msg = _("Unable to connect to AMQP server: %(e)s. "
|
msg = _LE("Unable to connect to AMQP server: %(e)s. "
|
||||||
"Sleeping %(delay)s seconds") % msg_dict
|
"Sleeping %(delay)s seconds") % msg_dict
|
||||||
LOG.error(msg)
|
LOG.error(msg)
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
delay = min(2 * delay, 60)
|
delay = min(delay + 1, 5)
|
||||||
else:
|
else:
|
||||||
LOG.info(_('Connected to AMQP server on %s'), broker)
|
LOG.info(_LI('Connected to AMQP server on %s'), broker)
|
||||||
break
|
break
|
||||||
|
|
||||||
self.session = self.connection.session()
|
self.session = self.connection.session()
|
||||||
@ -531,7 +533,7 @@ class Connection(object):
|
|||||||
consumer.reconnect(self.session)
|
consumer.reconnect(self.session)
|
||||||
self._register_consumer(consumer)
|
self._register_consumer(consumer)
|
||||||
|
|
||||||
LOG.debug(_("Re-established AMQP queues"))
|
LOG.debug("Re-established AMQP queues")
|
||||||
|
|
||||||
def ensure(self, error_callback, method, *args, **kwargs):
|
def ensure(self, error_callback, method, *args, **kwargs):
|
||||||
while True:
|
while True:
|
||||||
@ -570,7 +572,7 @@ class Connection(object):
|
|||||||
"""
|
"""
|
||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s") % log_info)
|
"%(err_str)s") % log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
@ -585,11 +587,11 @@ class Connection(object):
|
|||||||
|
|
||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
if isinstance(exc, qpid_exceptions.Empty):
|
if isinstance(exc, qpid_exceptions.Empty):
|
||||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
LOG.debug('Timed out waiting for RPC response: %s' %
|
||||||
str(exc))
|
str(exc))
|
||||||
raise rpc_common.Timeout()
|
raise rpc_common.Timeout()
|
||||||
else:
|
else:
|
||||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
LOG.exception(_LE('Failed to consume message from queue: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
@ -597,7 +599,7 @@ class Connection(object):
|
|||||||
try:
|
try:
|
||||||
self._lookup_consumer(nxt_receiver).consume()
|
self._lookup_consumer(nxt_receiver).consume()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Error processing message. Skipping it."))
|
LOG.exception(_LE("Error processing message. Skipping it."))
|
||||||
|
|
||||||
for iteration in itertools.count(0):
|
for iteration in itertools.count(0):
|
||||||
if limit and iteration >= limit:
|
if limit and iteration >= limit:
|
||||||
@ -624,7 +626,7 @@ class Connection(object):
|
|||||||
|
|
||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||||
LOG.exception(_("Failed to publish message to topic "
|
LOG.exception(_LE("Failed to publish message to topic "
|
||||||
"'%(topic)s': %(err_str)s") % log_info)
|
"'%(topic)s': %(err_str)s") % log_info)
|
||||||
|
|
||||||
def _publisher_send():
|
def _publisher_send():
|
||||||
|
@ -27,7 +27,7 @@ import six
|
|||||||
from six import moves
|
from six import moves
|
||||||
|
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common import excutils
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LE, _LI
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common.rpc import common as rpc_common
|
from neutron.openstack.common.rpc import common as rpc_common
|
||||||
@ -80,7 +80,7 @@ CONF = cfg.CONF
|
|||||||
CONF.register_opts(zmq_opts)
|
CONF.register_opts(zmq_opts)
|
||||||
|
|
||||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||||
matchmaker = None # memoized matchmaker object
|
matchmaker = None # memorized matchmaker object
|
||||||
|
|
||||||
|
|
||||||
def _serialize(data):
|
def _serialize(data):
|
||||||
@ -93,12 +93,12 @@ def _serialize(data):
|
|||||||
return jsonutils.dumps(data, ensure_ascii=True)
|
return jsonutils.dumps(data, ensure_ascii=True)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("JSON serialization failed."))
|
LOG.error(_LE("JSON serialization failed."))
|
||||||
|
|
||||||
|
|
||||||
def _deserialize(data):
|
def _deserialize(data):
|
||||||
"""Deserialization wrapper."""
|
"""Deserialization wrapper."""
|
||||||
LOG.debug(_("Deserializing: %s"), data)
|
LOG.debug("Deserializing: %s", data)
|
||||||
return jsonutils.loads(data)
|
return jsonutils.loads(data)
|
||||||
|
|
||||||
|
|
||||||
@ -133,9 +133,9 @@ class ZmqSocket(object):
|
|||||||
str_data = {'addr': addr, 'type': self.socket_s(),
|
str_data = {'addr': addr, 'type': self.socket_s(),
|
||||||
'subscribe': subscribe, 'bind': bind}
|
'subscribe': subscribe, 'bind': bind}
|
||||||
|
|
||||||
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
|
LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
|
||||||
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
|
LOG.debug("-> Subscribed to %(subscribe)s", str_data)
|
||||||
LOG.debug(_("-> bind: %(bind)s"), str_data)
|
LOG.debug("-> bind: %(bind)s", str_data)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if bind:
|
if bind:
|
||||||
@ -155,7 +155,7 @@ class ZmqSocket(object):
|
|||||||
"""Subscribe."""
|
"""Subscribe."""
|
||||||
if not self.can_sub:
|
if not self.can_sub:
|
||||||
raise RPCException("Cannot subscribe on this socket.")
|
raise RPCException("Cannot subscribe on this socket.")
|
||||||
LOG.debug(_("Subscribing to %s"), msg_filter)
|
LOG.debug("Subscribing to %s", msg_filter)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
|
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
|
||||||
@ -192,7 +192,7 @@ class ZmqSocket(object):
|
|||||||
# it would be much worse if some of the code calling this
|
# it would be much worse if some of the code calling this
|
||||||
# were to fail. For now, lets log, and later evaluate
|
# were to fail. For now, lets log, and later evaluate
|
||||||
# if we can safely raise here.
|
# if we can safely raise here.
|
||||||
LOG.error(_("ZeroMQ socket could not be closed."))
|
LOG.error(_LE("ZeroMQ socket could not be closed."))
|
||||||
self.sock = None
|
self.sock = None
|
||||||
|
|
||||||
def recv(self, **kwargs):
|
def recv(self, **kwargs):
|
||||||
@ -264,7 +264,7 @@ class InternalContext(object):
|
|||||||
|
|
||||||
def _get_response(self, ctx, proxy, topic, data):
|
def _get_response(self, ctx, proxy, topic, data):
|
||||||
"""Process a curried message and cast the result to topic."""
|
"""Process a curried message and cast the result to topic."""
|
||||||
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
|
LOG.debug("Running func with context: %s", ctx.to_dict())
|
||||||
data.setdefault('version', None)
|
data.setdefault('version', None)
|
||||||
data.setdefault('args', {})
|
data.setdefault('args', {})
|
||||||
|
|
||||||
@ -277,13 +277,13 @@ class InternalContext(object):
|
|||||||
# ignore these since they are just from shutdowns
|
# ignore these since they are just from shutdowns
|
||||||
pass
|
pass
|
||||||
except rpc_common.ClientException as e:
|
except rpc_common.ClientException as e:
|
||||||
LOG.debug(_("Expected exception during message handling (%s)") %
|
LOG.debug("Expected exception during message handling (%s)" %
|
||||||
e._exc_info[1])
|
e._exc_info[1])
|
||||||
return {'exc':
|
return {'exc':
|
||||||
rpc_common.serialize_remote_exception(e._exc_info,
|
rpc_common.serialize_remote_exception(e._exc_info,
|
||||||
log_failure=False)}
|
log_failure=False)}
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_("Exception during message handling"))
|
LOG.error(_LE("Exception during message handling"))
|
||||||
return {'exc':
|
return {'exc':
|
||||||
rpc_common.serialize_remote_exception(sys.exc_info())}
|
rpc_common.serialize_remote_exception(sys.exc_info())}
|
||||||
|
|
||||||
@ -302,7 +302,7 @@ class InternalContext(object):
|
|||||||
self._get_response(ctx, proxy, topic, payload),
|
self._get_response(ctx, proxy, topic, payload),
|
||||||
ctx.replies)
|
ctx.replies)
|
||||||
|
|
||||||
LOG.debug(_("Sending reply"))
|
LOG.debug("Sending reply")
|
||||||
_multi_send(_cast, ctx, topic, {
|
_multi_send(_cast, ctx, topic, {
|
||||||
'method': '-process_reply',
|
'method': '-process_reply',
|
||||||
'args': {
|
'args': {
|
||||||
@ -336,7 +336,7 @@ class ConsumerBase(object):
|
|||||||
# processed internally. (non-valid method name)
|
# processed internally. (non-valid method name)
|
||||||
method = data.get('method')
|
method = data.get('method')
|
||||||
if not method:
|
if not method:
|
||||||
LOG.error(_("RPC message did not include method."))
|
LOG.error(_LE("RPC message did not include method."))
|
||||||
return
|
return
|
||||||
|
|
||||||
# Internal method
|
# Internal method
|
||||||
@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase):
|
|||||||
def register(self, proxy, in_addr, zmq_type_in,
|
def register(self, proxy, in_addr, zmq_type_in,
|
||||||
in_bind=True, subscribe=None):
|
in_bind=True, subscribe=None):
|
||||||
|
|
||||||
LOG.info(_("Registering reactor"))
|
LOG.info(_LI("Registering reactor"))
|
||||||
|
|
||||||
if zmq_type_in not in (zmq.PULL, zmq.SUB):
|
if zmq_type_in not in (zmq.PULL, zmq.SUB):
|
||||||
raise RPCException("Bad input socktype")
|
raise RPCException("Bad input socktype")
|
||||||
@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase):
|
|||||||
self.proxies[inq] = proxy
|
self.proxies[inq] = proxy
|
||||||
self.sockets.append(inq)
|
self.sockets.append(inq)
|
||||||
|
|
||||||
LOG.info(_("In reactor registered"))
|
LOG.info(_LI("In reactor registered"))
|
||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
@excutils.forever_retry_uncaught_exceptions
|
@excutils.forever_retry_uncaught_exceptions
|
||||||
def _consume(sock):
|
def _consume(sock):
|
||||||
LOG.info(_("Consuming socket"))
|
LOG.info(_LI("Consuming socket"))
|
||||||
while True:
|
while True:
|
||||||
self.consume(sock)
|
self.consume(sock)
|
||||||
|
|
||||||
@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
|
|
||||||
if topic not in self.topic_proxy:
|
if topic not in self.topic_proxy:
|
||||||
def publisher(waiter):
|
def publisher(waiter):
|
||||||
LOG.info(_("Creating proxy for topic: %s"), topic)
|
LOG.info(_LI("Creating proxy for topic: %s"), topic)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# The topic is received over the network,
|
# The topic is received over the network,
|
||||||
@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
try:
|
try:
|
||||||
wait_sock_creation.wait()
|
wait_sock_creation.wait()
|
||||||
except RPCException:
|
except RPCException:
|
||||||
LOG.error(_("Topic socket file creation failed."))
|
LOG.error(_LE("Topic socket file creation failed."))
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.topic_proxy[topic].put_nowait(data)
|
self.topic_proxy[topic].put_nowait(data)
|
||||||
except eventlet.queue.Full:
|
except eventlet.queue.Full:
|
||||||
LOG.error(_("Local per-topic backlog buffer full for topic "
|
LOG.error(_LE("Local per-topic backlog buffer full for topic "
|
||||||
"%(topic)s. Dropping message.") % {'topic': topic})
|
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
"""Runs the ZmqProxy service."""
|
"""Runs the ZmqProxy service."""
|
||||||
@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
except os.error:
|
except os.error:
|
||||||
if not os.path.isdir(ipc_dir):
|
if not os.path.isdir(ipc_dir):
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Required IPC directory does not exist at"
|
LOG.error(_LE("Required IPC directory does not exist at"
|
||||||
" %s") % (ipc_dir, ))
|
" %s") % (ipc_dir, ))
|
||||||
try:
|
try:
|
||||||
self.register(consumption_proxy,
|
self.register(consumption_proxy,
|
||||||
consume_in,
|
consume_in,
|
||||||
@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
except zmq.ZMQError:
|
except zmq.ZMQError:
|
||||||
if os.access(ipc_dir, os.X_OK):
|
if os.access(ipc_dir, os.X_OK):
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Permission denied to IPC directory at"
|
LOG.error(_LE("Permission denied to IPC directory at"
|
||||||
" %s") % (ipc_dir, ))
|
" %s") % (ipc_dir, ))
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
|
||||||
"Socket may already be in use."))
|
"Socket may already be in use."))
|
||||||
|
|
||||||
super(ZmqProxy, self).consume_in_thread()
|
super(ZmqProxy, self).consume_in_thread()
|
||||||
|
|
||||||
@ -541,7 +541,7 @@ class ZmqReactor(ZmqBaseReactor):
|
|||||||
def consume(self, sock):
|
def consume(self, sock):
|
||||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||||
data = sock.recv()
|
data = sock.recv()
|
||||||
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
|
LOG.debug("CONSUMER RECEIVED DATA: %s", data)
|
||||||
|
|
||||||
proxy = self.proxies[sock]
|
proxy = self.proxies[sock]
|
||||||
|
|
||||||
@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor):
|
|||||||
# Unmarshal only after verifying the message.
|
# Unmarshal only after verifying the message.
|
||||||
ctx = RpcContext.unmarshal(data[3])
|
ctx = RpcContext.unmarshal(data[3])
|
||||||
else:
|
else:
|
||||||
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
|
LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
|
||||||
return
|
return
|
||||||
|
|
||||||
self.pool.spawn_n(self.process, proxy, ctx, request)
|
self.pool.spawn_n(self.process, proxy, ctx, request)
|
||||||
@ -588,14 +588,14 @@ class Connection(rpc_common.Connection):
|
|||||||
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
||||||
|
|
||||||
if topic in self.topics:
|
if topic in self.topics:
|
||||||
LOG.info(_("Skipping topic registration. Already registered."))
|
LOG.info(_LI("Skipping topic registration. Already registered."))
|
||||||
return
|
return
|
||||||
|
|
||||||
# Receive messages from (local) proxy
|
# Receive messages from (local) proxy
|
||||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||||
(CONF.rpc_zmq_ipc_dir, topic)
|
(CONF.rpc_zmq_ipc_dir, topic)
|
||||||
|
|
||||||
LOG.debug(_("Consumer is a zmq.%s"),
|
LOG.debug("Consumer is a zmq.%s",
|
||||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||||
|
|
||||||
self.reactor.register(proxy, inaddr, sock_type,
|
self.reactor.register(proxy, inaddr, sock_type,
|
||||||
@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
# Replies always come into the reply service.
|
# Replies always come into the reply service.
|
||||||
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
||||||
|
|
||||||
LOG.debug(_("Creating payload"))
|
LOG.debug("Creating payload")
|
||||||
# Curry the original request into a reply method.
|
# Curry the original request into a reply method.
|
||||||
mcontext = RpcContext.marshal(context)
|
mcontext = RpcContext.marshal(context)
|
||||||
payload = {
|
payload = {
|
||||||
@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug(_("Creating queue socket for reply waiter"))
|
LOG.debug("Creating queue socket for reply waiter")
|
||||||
|
|
||||||
# Messages arriving async.
|
# Messages arriving async.
|
||||||
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
|
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
|
||||||
@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
zmq.SUB, subscribe=msg_id, bind=False
|
zmq.SUB, subscribe=msg_id, bind=False
|
||||||
)
|
)
|
||||||
|
|
||||||
LOG.debug(_("Sending cast"))
|
LOG.debug("Sending cast")
|
||||||
_cast(addr, context, topic, payload, envelope)
|
_cast(addr, context, topic, payload, envelope)
|
||||||
|
|
||||||
LOG.debug(_("Cast sent; Waiting reply"))
|
LOG.debug("Cast sent; Waiting reply")
|
||||||
# Blocks until receives reply
|
# Blocks until receives reply
|
||||||
msg = msg_waiter.recv()
|
msg = msg_waiter.recv()
|
||||||
LOG.debug(_("Received message: %s"), msg)
|
LOG.debug("Received message: %s", msg)
|
||||||
LOG.debug(_("Unpacking response"))
|
LOG.debug("Unpacking response")
|
||||||
|
|
||||||
if msg[2] == 'cast': # Legacy version
|
if msg[2] == 'cast': # Legacy version
|
||||||
raw_msg = _deserialize(msg[-1])[-1]
|
raw_msg = _deserialize(msg[-1])[-1]
|
||||||
@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
|||||||
Dispatches to the matchmaker and sends message to all relevant hosts.
|
Dispatches to the matchmaker and sends message to all relevant hosts.
|
||||||
"""
|
"""
|
||||||
conf = CONF
|
conf = CONF
|
||||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||||
|
|
||||||
queues = _get_matchmaker().queues(topic)
|
queues = _get_matchmaker().queues(topic)
|
||||||
LOG.debug(_("Sending message(s) to: %s"), queues)
|
LOG.debug("Sending message(s) to: %s", queues)
|
||||||
|
|
||||||
# Don't stack if we have no matchmaker results
|
# Don't stack if we have no matchmaker results
|
||||||
if not queues:
|
if not queues:
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
The MatchMaker classes should except a Topic or Fanout exchange key and
|
The MatchMaker classes should except a Topic or Fanout exchange key and
|
||||||
return keys for direct exchanges, per (approximate) AMQP parlance.
|
return keys for direct exchanges, per (approximate) AMQP parlance.
|
||||||
@ -21,7 +22,7 @@ import contextlib
|
|||||||
import eventlet
|
import eventlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _, _LI
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
@ -212,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
|||||||
self.hosts.discard(host)
|
self.hosts.discard(host)
|
||||||
self.backend_unregister(key, '.'.join((key, host)))
|
self.backend_unregister(key, '.'.join((key, host)))
|
||||||
|
|
||||||
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
|
LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
|
||||||
{'key': key, 'host': host})
|
{'key': key, 'host': host})
|
||||||
|
|
||||||
def start_heartbeat(self):
|
def start_heartbeat(self):
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
The MatchMaker classes should accept a Topic or Fanout exchange key and
|
The MatchMaker classes should accept a Topic or Fanout exchange key and
|
||||||
return keys for direct exchanges, per (approximate) AMQP parlance.
|
return keys for direct exchanges, per (approximate) AMQP parlance.
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
The MatchMaker classes should except a Topic or Fanout exchange key and
|
The MatchMaker classes should except a Topic or Fanout exchange key and
|
||||||
return keys for direct exchanges, per (approximate) AMQP parlance.
|
return keys for direct exchanges, per (approximate) AMQP parlance.
|
||||||
@ -21,7 +22,7 @@ import json
|
|||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
from neutron.openstack.common.gettextutils import _LW
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common.rpc import matchmaker as mm
|
from neutron.openstack.common.rpc import matchmaker as mm
|
||||||
|
|
||||||
@ -52,9 +53,8 @@ class RingExchange(mm.Exchange):
|
|||||||
if ring:
|
if ring:
|
||||||
self.ring = ring
|
self.ring = ring
|
||||||
else:
|
else:
|
||||||
fh = open(CONF.matchmaker_ring.ringfile, 'r')
|
with open(CONF.matchmaker_ring.ringfile, 'r') as fh:
|
||||||
self.ring = json.load(fh)
|
self.ring = json.load(fh)
|
||||||
fh.close()
|
|
||||||
|
|
||||||
self.ring0 = {}
|
self.ring0 = {}
|
||||||
for k in self.ring.keys():
|
for k in self.ring.keys():
|
||||||
@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange):
|
|||||||
def run(self, key):
|
def run(self, key):
|
||||||
if not self._ring_has(key):
|
if not self._ring_has(key):
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
_("No key defining hosts for topic '%s', "
|
_LW("No key defining hosts for topic '%s', "
|
||||||
"see ringfile") % (key, )
|
"see ringfile") % (key, )
|
||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
host = next(self.ring0[key])
|
host = next(self.ring0[key])
|
||||||
@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange):
|
|||||||
nkey = key.split('fanout~')[1:][0]
|
nkey = key.split('fanout~')[1:][0]
|
||||||
if not self._ring_has(nkey):
|
if not self._ring_has(nkey):
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
_("No key defining hosts for topic '%s', "
|
_LW("No key defining hosts for topic '%s', "
|
||||||
"see ringfile") % (nkey, )
|
"see ringfile") % (nkey, )
|
||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from neutron.openstack.common.gettextutils import _
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import rpc
|
from neutron.openstack.common import rpc
|
||||||
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
|
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
|
||||||
@ -44,7 +43,7 @@ class Service(service.Service):
|
|||||||
super(Service, self).start()
|
super(Service, self).start()
|
||||||
|
|
||||||
self.conn = rpc.create_connection(new=True)
|
self.conn = rpc.create_connection(new=True)
|
||||||
LOG.debug(_("Creating Consumer connection for Service %s") %
|
LOG.debug("Creating Consumer connection for Service %s" %
|
||||||
self.topic)
|
self.topic)
|
||||||
|
|
||||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
|
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
|
||||||
|
Loading…
x
Reference in New Issue
Block a user