Merge "Update with latest code from openstack-common (stable/folsom)"
This commit is contained in:
commit
7139cdd3aa
@ -367,6 +367,11 @@ class ConfigFileValueError(Error):
|
||||
pass
|
||||
|
||||
|
||||
def _fixpath(p):
|
||||
"""Apply tilde expansion and absolutization to a path."""
|
||||
return os.path.abspath(os.path.expanduser(p))
|
||||
|
||||
|
||||
def _get_config_dirs(project=None):
|
||||
"""Return a list of directors where config files may be located.
|
||||
|
||||
@ -384,11 +389,9 @@ def _get_config_dirs(project=None):
|
||||
~/
|
||||
/etc/
|
||||
"""
|
||||
fix_path = lambda p: os.path.abspath(os.path.expanduser(p))
|
||||
|
||||
cfg_dirs = [
|
||||
fix_path(os.path.join('~', '.' + project)) if project else None,
|
||||
fix_path('~'),
|
||||
_fixpath(os.path.join('~', '.' + project)) if project else None,
|
||||
_fixpath('~'),
|
||||
os.path.join('/etc', project) if project else None,
|
||||
'/etc'
|
||||
]
|
||||
@ -809,7 +812,7 @@ class OptGroup(object):
|
||||
if _is_opt_registered(self._opts, opt):
|
||||
return False
|
||||
|
||||
self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None}
|
||||
self._opts[opt.dest] = {'opt': opt}
|
||||
|
||||
return True
|
||||
|
||||
@ -1087,7 +1090,7 @@ class ConfigOpts(collections.Mapping):
|
||||
if _is_opt_registered(self._opts, opt):
|
||||
return False
|
||||
|
||||
self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None}
|
||||
self._opts[opt.dest] = {'opt': opt}
|
||||
|
||||
return True
|
||||
|
||||
@ -1205,6 +1208,33 @@ class ConfigOpts(collections.Mapping):
|
||||
opt_info = self._get_opt_info(name, group)
|
||||
opt_info['default'] = default
|
||||
|
||||
@__clear_cache
|
||||
def clear_override(self, name, group=None):
|
||||
"""Clear an override an opt value.
|
||||
|
||||
Clear a previously set override of the command line, config file
|
||||
and default values of a given option.
|
||||
|
||||
:param name: the name/dest of the opt
|
||||
:param group: an option OptGroup object or group name
|
||||
:raises: NoSuchOptError, NoSuchGroupError
|
||||
"""
|
||||
opt_info = self._get_opt_info(name, group)
|
||||
opt_info.pop('override', None)
|
||||
|
||||
@__clear_cache
|
||||
def clear_default(self, name, group=None):
|
||||
"""Clear an override an opt's default value.
|
||||
|
||||
Clear a previously set override of the default value of given option.
|
||||
|
||||
:param name: the name/dest of the opt
|
||||
:param group: an option OptGroup object or group name
|
||||
:raises: NoSuchOptError, NoSuchGroupError
|
||||
"""
|
||||
opt_info = self._get_opt_info(name, group)
|
||||
opt_info.pop('default', None)
|
||||
|
||||
def _all_opt_infos(self):
|
||||
"""A generator function for iteration opt infos."""
|
||||
for info in self._opts.values():
|
||||
@ -1221,8 +1251,8 @@ class ConfigOpts(collections.Mapping):
|
||||
def _unset_defaults_and_overrides(self):
|
||||
"""Unset any default or override on all options."""
|
||||
for info, group in self._all_opt_infos():
|
||||
info['default'] = None
|
||||
info['override'] = None
|
||||
info.pop('default', None)
|
||||
info.pop('override', None)
|
||||
|
||||
def disable_interspersed_args(self):
|
||||
"""Set parsing to stop on the first non-option.
|
||||
@ -1268,10 +1298,10 @@ class ConfigOpts(collections.Mapping):
|
||||
"""
|
||||
dirs = []
|
||||
if self.config_dir:
|
||||
dirs.append(self.config_dir)
|
||||
dirs.append(_fixpath(self.config_dir))
|
||||
|
||||
for cf in reversed(self.config_file):
|
||||
dirs.append(os.path.dirname(cf))
|
||||
dirs.append(os.path.dirname(_fixpath(cf)))
|
||||
|
||||
dirs.extend(_get_config_dirs(self.project))
|
||||
|
||||
@ -1345,10 +1375,10 @@ class ConfigOpts(collections.Mapping):
|
||||
return self.GroupAttr(self, self._get_group(name))
|
||||
|
||||
info = self._get_opt_info(name, group)
|
||||
default, opt, override = [info[k] for k in sorted(info.keys())]
|
||||
opt = info['opt']
|
||||
|
||||
if override is not None:
|
||||
return override
|
||||
if 'override' in info:
|
||||
return info['override']
|
||||
|
||||
values = []
|
||||
if self._cparser is not None:
|
||||
@ -1376,8 +1406,8 @@ class ConfigOpts(collections.Mapping):
|
||||
if values:
|
||||
return values
|
||||
|
||||
if default is not None:
|
||||
return default
|
||||
if 'default' in info:
|
||||
return info['default']
|
||||
|
||||
return opt.default
|
||||
|
||||
@ -1452,6 +1482,8 @@ class ConfigOpts(collections.Mapping):
|
||||
config_dir_glob = os.path.join(self.config_dir, '*.conf')
|
||||
config_files += sorted(glob.glob(config_dir_glob))
|
||||
|
||||
config_files = [_fixpath(p) for p in config_files]
|
||||
|
||||
self._cparser = MultiConfigParser()
|
||||
|
||||
try:
|
||||
@ -1469,10 +1501,10 @@ class ConfigOpts(collections.Mapping):
|
||||
:raises: RequiredOptError
|
||||
"""
|
||||
for info, group in self._all_opt_infos():
|
||||
default, opt, override = [info[k] for k in sorted(info.keys())]
|
||||
opt = info['opt']
|
||||
|
||||
if opt.required:
|
||||
if (default is not None or override is not None):
|
||||
if ('default' in info or 'override' in info):
|
||||
continue
|
||||
|
||||
if self._get(opt.name, group) is None:
|
||||
|
@ -30,14 +30,14 @@ def save_and_reraise_exception():
|
||||
"""Save current exception, run some code and then re-raise.
|
||||
|
||||
In some cases the exception context can be cleared, resulting in None
|
||||
being attempted to be reraised after an exception handler is run. This
|
||||
being attempted to be re-raised after an exception handler is run. This
|
||||
can happen when eventlet switches greenthreads or when running an
|
||||
exception handler, code raises and catches an exception. In both
|
||||
cases the exception context will be cleared.
|
||||
|
||||
To work around this, we save the exception state, run handler code, and
|
||||
then re-raise the original exception. If another exception occurs, the
|
||||
saved exception is logged and the new exception is reraised.
|
||||
saved exception is logged and the new exception is re-raised.
|
||||
"""
|
||||
type_, value, tb = sys.exc_info()
|
||||
try:
|
||||
|
@ -29,7 +29,7 @@ def import_class(import_str):
|
||||
try:
|
||||
__import__(mod_str)
|
||||
return getattr(sys.modules[mod_str], class_str)
|
||||
except (ImportError, ValueError, AttributeError), exc:
|
||||
except (ValueError, AttributeError), exc:
|
||||
raise ImportError('Class %s cannot be found (%s)' %
|
||||
(class_str,
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
|
@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import uuid
|
||||
|
||||
from quantum.openstack.common import cfg
|
||||
@ -140,9 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload):
|
||||
driver.notify(context, msg)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. "
|
||||
"Payload=%(payload)s") %
|
||||
locals())
|
||||
"send to notification system. Payload=%(payload)s") %
|
||||
locals())
|
||||
|
||||
|
||||
_drivers = None
|
||||
@ -171,7 +169,7 @@ def add_driver(notification_driver):
|
||||
except ImportError as e:
|
||||
LOG.exception(_("Failed to load notifier %s. "
|
||||
"These notifications will not be sent.") %
|
||||
notification_driver)
|
||||
notification_driver)
|
||||
else:
|
||||
# Driver is already loaded; just add the object.
|
||||
_drivers[notification_driver] = notification_driver
|
||||
|
@ -173,7 +173,7 @@ class Brain(object):
|
||||
LOG.warning(_("Inheritance-based rules are deprecated; update "
|
||||
"_check_%s") % match_kind)
|
||||
func = (lambda brain, kind, value, target, cred:
|
||||
old_func(value, target, cred))
|
||||
old_func(value, target, cred))
|
||||
|
||||
if not func:
|
||||
LOG.error(_("No handler for matches of kind %s") % match_kind)
|
||||
@ -296,5 +296,5 @@ def _check_generic(brain, match_kind, match, target_dict, cred_dict):
|
||||
# TODO(termie): do dict inspection via dot syntax
|
||||
match = match % target_dict
|
||||
if match_kind in cred_dict:
|
||||
return match == cred_dict[match_kind]
|
||||
return match == unicode(cred_dict[match_kind])
|
||||
return False
|
||||
|
@ -19,10 +19,8 @@
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from quantum.openstack.common import cfg
|
||||
from quantum.openstack.common.gettextutils import _
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import jsonutils
|
||||
|
@ -72,7 +72,7 @@ zmq_opts = [
|
||||
|
||||
# These globals are defined in register_opts(conf),
|
||||
# a mandatory initialization call
|
||||
FLAGS = None
|
||||
CONF = None
|
||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||
matchmaker = None # memoized matchmaker object
|
||||
|
||||
@ -274,7 +274,7 @@ class InternalContext(object):
|
||||
ctx.replies)
|
||||
|
||||
LOG.debug(_("Sending reply"))
|
||||
cast(FLAGS, ctx, topic, {
|
||||
cast(CONF, ctx, topic, {
|
||||
'method': '-process_reply',
|
||||
'args': {
|
||||
'msg_id': msg_id,
|
||||
@ -329,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def __init__(self, conf):
|
||||
super(ZmqBaseReactor, self).__init__()
|
||||
|
||||
self.conf = conf
|
||||
self.mapping = {}
|
||||
self.proxies = {}
|
||||
self.threads = []
|
||||
@ -405,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
super(ZmqProxy, self).__init__(conf)
|
||||
|
||||
self.topic_proxy = {}
|
||||
ipc_dir = conf.rpc_zmq_ipc_dir
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
self.topic_proxy['zmq_replies'] = \
|
||||
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
|
||||
@ -413,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
self.sockets.append(self.topic_proxy['zmq_replies'])
|
||||
|
||||
def consume(self, sock):
|
||||
ipc_dir = self.conf.rpc_zmq_ipc_dir
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
@ -487,7 +486,6 @@ class Connection(rpc_common.Connection):
|
||||
"""Manages connections and threads."""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.reactor = ZmqReactor(conf)
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
@ -508,7 +506,7 @@ class Connection(rpc_common.Connection):
|
||||
|
||||
# Receive messages from (local) proxy
|
||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||
(self.conf.rpc_zmq_ipc_dir, topic)
|
||||
(CONF.rpc_zmq_ipc_dir, topic)
|
||||
|
||||
LOG.debug(_("Consumer is a zmq.%s"),
|
||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||
@ -527,7 +525,7 @@ class Connection(rpc_common.Connection):
|
||||
|
||||
|
||||
def _cast(addr, context, msg_id, topic, msg, timeout=None):
|
||||
timeout_cast = timeout or FLAGS.rpc_cast_timeout
|
||||
timeout_cast = timeout or CONF.rpc_cast_timeout
|
||||
payload = [RpcContext.marshal(context), msg]
|
||||
|
||||
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
||||
@ -545,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
|
||||
|
||||
def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
# timeout_response is how long we wait for a response
|
||||
timeout = timeout or FLAGS.rpc_response_timeout
|
||||
timeout = timeout or CONF.rpc_response_timeout
|
||||
|
||||
# The msg_id is used to track replies.
|
||||
msg_id = str(uuid.uuid4().hex)
|
||||
|
||||
# Replies always come into the reply service.
|
||||
reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
|
||||
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
||||
|
||||
LOG.debug(_("Creating payload"))
|
||||
# Curry the original request into a reply method.
|
||||
@ -573,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
with Timeout(timeout, exception=rpc_common.Timeout):
|
||||
try:
|
||||
msg_waiter = ZmqSocket(
|
||||
"ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
|
||||
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
|
||||
zmq.SUB, subscribe=msg_id, bind=False
|
||||
)
|
||||
|
||||
@ -599,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
# responses for Exceptions.
|
||||
for resp in responses:
|
||||
if isinstance(resp, types.DictType) and 'exc' in resp:
|
||||
raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
|
||||
raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
|
||||
|
||||
return responses[-1]
|
||||
|
||||
@ -610,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
|
||||
dispatches to the matchmaker and sends
|
||||
message to all relevant hosts.
|
||||
"""
|
||||
conf = FLAGS
|
||||
conf = CONF
|
||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
|
||||
queues = matchmaker.queues(topic)
|
||||
@ -641,26 +639,22 @@ def create_connection(conf, new=True):
|
||||
|
||||
def multicall(conf, *args, **kwargs):
|
||||
"""Multiple calls."""
|
||||
register_opts(conf)
|
||||
return _multi_send(_call, *args, **kwargs)
|
||||
|
||||
|
||||
def call(conf, *args, **kwargs):
|
||||
"""Send a message, expect a response."""
|
||||
register_opts(conf)
|
||||
data = _multi_send(_call, *args, **kwargs)
|
||||
return data[-1]
|
||||
|
||||
|
||||
def cast(conf, *args, **kwargs):
|
||||
"""Send a message expecting no reply."""
|
||||
register_opts(conf)
|
||||
_multi_send(_cast, *args, **kwargs)
|
||||
|
||||
|
||||
def fanout_cast(conf, context, topic, msg, **kwargs):
|
||||
"""Send a message to all listening and expect no reply."""
|
||||
register_opts(conf)
|
||||
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
|
||||
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
@ -672,7 +666,6 @@ def notify(conf, context, topic, msg, **kwargs):
|
||||
Notifications are sent to topic-priority.
|
||||
This differs from the AMQP drivers which send to topic.priority.
|
||||
"""
|
||||
register_opts(conf)
|
||||
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||
# work with our assumptions.
|
||||
topic.replace('.', '-')
|
||||
@ -684,7 +677,7 @@ def cleanup():
|
||||
global ZMQ_CTX
|
||||
global matchmaker
|
||||
matchmaker = None
|
||||
ZMQ_CTX.destroy()
|
||||
ZMQ_CTX.term()
|
||||
ZMQ_CTX = None
|
||||
|
||||
|
||||
@ -697,11 +690,11 @@ def register_opts(conf):
|
||||
# We memoize through these globals
|
||||
global ZMQ_CTX
|
||||
global matchmaker
|
||||
global FLAGS
|
||||
global CONF
|
||||
|
||||
if not FLAGS:
|
||||
if not CONF:
|
||||
conf.register_opts(zmq_opts)
|
||||
FLAGS = conf
|
||||
CONF = conf
|
||||
# Don't re-set, if this method is called twice.
|
||||
if not ZMQ_CTX:
|
||||
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
|
||||
|
@ -93,13 +93,13 @@ def set_time_override(override_time=datetime.datetime.utcnow()):
|
||||
|
||||
|
||||
def advance_time_delta(timedelta):
|
||||
"""Advance overriden time using a datetime.timedelta."""
|
||||
"""Advance overridden time using a datetime.timedelta."""
|
||||
assert(not utcnow.override_time is None)
|
||||
utcnow.override_time += timedelta
|
||||
|
||||
|
||||
def advance_time_seconds(seconds):
|
||||
"""Advance overriden time by seconds."""
|
||||
"""Advance overridden time by seconds."""
|
||||
advance_time_delta(datetime.timedelta(0, seconds))
|
||||
|
||||
|
||||
@ -122,6 +122,5 @@ def marshall_now(now=None):
|
||||
def unmarshall_time(tyme):
|
||||
"""Unmarshall a datetime dict."""
|
||||
return datetime.datetime(day=tyme['day'], month=tyme['month'],
|
||||
year=tyme['year'], hour=tyme['hour'],
|
||||
minute=tyme['minute'], second=tyme['second'],
|
||||
microsecond=tyme['microsecond'])
|
||||
year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
|
||||
second=tyme['second'], microsecond=tyme['microsecond'])
|
||||
|
@ -61,10 +61,12 @@ class ClusterConfigOptions(cfg.CommonConfigOpts):
|
||||
"""
|
||||
if group is None and name in self._groups:
|
||||
return self.GroupAttr(self, self._get_group(name))
|
||||
|
||||
info = self._get_opt_info(name, group)
|
||||
default, opt, override = [info[k] for k in sorted(info.keys())]
|
||||
if override is not None:
|
||||
return override
|
||||
opt = info['opt']
|
||||
|
||||
if 'override' in info:
|
||||
return info['override']
|
||||
|
||||
values = []
|
||||
if self._cparser is not None:
|
||||
@ -96,8 +98,8 @@ class ClusterConfigOptions(cfg.CommonConfigOpts):
|
||||
if values:
|
||||
return values
|
||||
|
||||
if default is not None:
|
||||
return default
|
||||
if 'default' in info:
|
||||
return info['default']
|
||||
|
||||
return opt.default
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user