From 2d0da07be277ac2a9b411ecec3c88633ee5d5c5d Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Mon, 3 Sep 2012 13:55:35 +1000 Subject: [PATCH] Update openstack-common code to latest. Change-Id: I63d32f70176af7ea6accf4c48c4fc8b646a7e31e Signed-off-by: Angus Salkeld --- ceilometer/openstack/common/cfg.py | 85 ++++++++++++++++----- ceilometer/openstack/common/excutils.py | 4 +- ceilometer/openstack/common/importutils.py | 2 +- ceilometer/openstack/common/notifier/api.py | 1 - ceilometer/openstack/common/rpc/common.py | 2 - ceilometer/openstack/common/rpc/impl_zmq.py | 37 ++++----- ceilometer/openstack/common/timeutils.py | 22 +++++- 7 files changed, 106 insertions(+), 47 deletions(-) diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py index f710d11e6..d8d4c8d9c 100644 --- a/ceilometer/openstack/common/cfg.py +++ b/ceilometer/openstack/common/cfg.py @@ -367,6 +367,11 @@ class ConfigFileValueError(Error): pass +def _fixpath(p): + """Apply tilde expansion and absolutization to a path.""" + return os.path.abspath(os.path.expanduser(p)) + + def _get_config_dirs(project=None): """Return a list of directors where config files may be located. @@ -384,11 +389,9 @@ def _get_config_dirs(project=None): ~/ /etc/ """ - fix_path = lambda p: os.path.abspath(os.path.expanduser(p)) - cfg_dirs = [ - fix_path(os.path.join('~', '.' + project)) if project else None, - fix_path('~'), + _fixpath(os.path.join('~', '.' + project)) if project else None, + _fixpath('~'), os.path.join('/etc', project) if project else None, '/etc' ] @@ -809,7 +812,7 @@ class OptGroup(object): if _is_opt_registered(self._opts, opt): return False - self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None} + self._opts[opt.dest] = {'opt': opt} return True @@ -1087,7 +1090,7 @@ class ConfigOpts(collections.Mapping): if _is_opt_registered(self._opts, opt): return False - self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None} + self._opts[opt.dest] = {'opt': opt} return True @@ -1156,6 +1159,25 @@ class ConfigOpts(collections.Mapping): for opt in opts: self.unregister_opt(opt, group, clear_cache=False) + def import_opt(self, name, module_str, group=None): + """Import an option definition from a module. + + Import a module and check that a given option is registered. + + This is intended for use with global configuration objects + like cfg.CONF where modules commonly register options with + CONF at module load time. If one module requires an option + defined by another module it can use this method to explicitly + declare the dependency. + + :param name: the name/dest of the opt + :param module_str: the name of a module to import + :param group: an option OptGroup object or group name + :raises: NoSuchOptError, NoSuchGroupError + """ + __import__(module_str) + self._get_opt_info(name, group) + @__clear_cache def set_override(self, name, override, group=None): """Override an opt value. @@ -1186,6 +1208,33 @@ class ConfigOpts(collections.Mapping): opt_info = self._get_opt_info(name, group) opt_info['default'] = default + @__clear_cache + def clear_override(self, name, group=None): + """Clear an override an opt value. + + Clear a previously set override of the command line, config file + and default values of a given option. + + :param name: the name/dest of the opt + :param group: an option OptGroup object or group name + :raises: NoSuchOptError, NoSuchGroupError + """ + opt_info = self._get_opt_info(name, group) + opt_info.pop('override', None) + + @__clear_cache + def clear_default(self, name, group=None): + """Clear an override an opt's default value. + + Clear a previously set override of the default value of given option. + + :param name: the name/dest of the opt + :param group: an option OptGroup object or group name + :raises: NoSuchOptError, NoSuchGroupError + """ + opt_info = self._get_opt_info(name, group) + opt_info.pop('default', None) + def _all_opt_infos(self): """A generator function for iteration opt infos.""" for info in self._opts.values(): @@ -1202,8 +1251,8 @@ class ConfigOpts(collections.Mapping): def _unset_defaults_and_overrides(self): """Unset any default or override on all options.""" for info, group in self._all_opt_infos(): - info['default'] = None - info['override'] = None + info.pop('default', None) + info.pop('override', None) def disable_interspersed_args(self): """Set parsing to stop on the first non-option. @@ -1249,10 +1298,10 @@ class ConfigOpts(collections.Mapping): """ dirs = [] if self.config_dir: - dirs.append(self.config_dir) + dirs.append(_fixpath(self.config_dir)) for cf in reversed(self.config_file): - dirs.append(os.path.dirname(cf)) + dirs.append(os.path.dirname(_fixpath(cf))) dirs.extend(_get_config_dirs(self.project)) @@ -1326,10 +1375,10 @@ class ConfigOpts(collections.Mapping): return self.GroupAttr(self, self._get_group(name)) info = self._get_opt_info(name, group) - default, opt, override = [info[k] for k in sorted(info.keys())] + opt = info['opt'] - if override is not None: - return override + if 'override' in info: + return info['override'] values = [] if self._cparser is not None: @@ -1357,8 +1406,8 @@ class ConfigOpts(collections.Mapping): if values: return values - if default is not None: - return default + if 'default' in info: + return info['default'] return opt.default @@ -1433,6 +1482,8 @@ class ConfigOpts(collections.Mapping): config_dir_glob = os.path.join(self.config_dir, '*.conf') config_files += sorted(glob.glob(config_dir_glob)) + config_files = [_fixpath(p) for p in config_files] + self._cparser = MultiConfigParser() try: @@ -1450,10 +1501,10 @@ class ConfigOpts(collections.Mapping): :raises: RequiredOptError """ for info, group in self._all_opt_infos(): - default, opt, override = [info[k] for k in sorted(info.keys())] + opt = info['opt'] if opt.required: - if (default is not None or override is not None): + if ('default' in info or 'override' in info): continue if self._get(opt.name, group) is None: diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index 67c9fa951..5dd483017 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -30,14 +30,14 @@ def save_and_reraise_exception(): """Save current exception, run some code and then re-raise. In some cases the exception context can be cleared, resulting in None - being attempted to be reraised after an exception handler is run. This + being attempted to be re-raised after an exception handler is run. This can happen when eventlet switches greenthreads or when running an exception handler, code raises and catches an exception. In both cases the exception context will be cleared. To work around this, we save the exception state, run handler code, and then re-raise the original exception. If another exception occurs, the - saved exception is logged and the new exception is reraised. + saved exception is logged and the new exception is re-raised. """ type_, value, tb = sys.exc_info() try: diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py index 2fbb0291a..f45372b4d 100644 --- a/ceilometer/openstack/common/importutils.py +++ b/ceilometer/openstack/common/importutils.py @@ -29,7 +29,7 @@ def import_class(import_str): try: __import__(mod_str) return getattr(sys.modules[mod_str], class_str) - except (ImportError, ValueError, AttributeError), exc: + except (ValueError, AttributeError), exc: raise ImportError('Class %s cannot be found (%s)' % (class_str, traceback.format_exception(*sys.exc_info()))) diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 20e6abf0c..3f31eb84a 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import inspect import uuid from ceilometer.openstack.common import cfg diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 4f35d96b8..84f06c1b2 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -19,10 +19,8 @@ import copy import logging -import sys import traceback -from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 3eff97fc6..bf6a81dfc 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -72,7 +72,7 @@ zmq_opts = [ # These globals are defined in register_opts(conf), # a mandatory initialization call -FLAGS = None +CONF = None ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -274,7 +274,7 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(FLAGS, ctx, topic, { + cast(CONF, ctx, topic, { 'method': '-process_reply', 'args': { 'msg_id': msg_id, @@ -329,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.conf = conf self.mapping = {} self.proxies = {} self.threads = [] @@ -405,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = conf.rpc_zmq_ipc_dir + ipc_dir = CONF.rpc_zmq_ipc_dir self.topic_proxy['zmq_replies'] = \ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), @@ -413,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor): self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): - ipc_dir = self.conf.rpc_zmq_ipc_dir + ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() @@ -487,7 +486,6 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): - self.conf = conf self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): @@ -508,7 +506,7 @@ class Connection(rpc_common.Connection): # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ - (self.conf.rpc_zmq_ipc_dir, topic) + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) @@ -527,7 +525,7 @@ class Connection(rpc_common.Connection): def _cast(addr, context, msg_id, topic, msg, timeout=None): - timeout_cast = timeout or FLAGS.rpc_cast_timeout + timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] with Timeout(timeout_cast, exception=rpc_common.Timeout): @@ -545,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): def _call(addr, context, msg_id, topic, msg, timeout=None): # timeout_response is how long we wait for a response - timeout = timeout or FLAGS.rpc_response_timeout + timeout = timeout or CONF.rpc_response_timeout # The msg_id is used to track replies. msg_id = str(uuid.uuid4().hex) # Replies always come into the reply service. - reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host + reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host LOG.debug(_("Creating payload")) # Curry the original request into a reply method. @@ -573,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, zmq.SUB, subscribe=msg_id, bind=False ) @@ -599,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): # responses for Exceptions. for resp in responses: if isinstance(resp, types.DictType) and 'exc' in resp: - raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) + raise rpc_common.deserialize_remote_exception(CONF, resp['exc']) return responses[-1] @@ -610,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None): dispatches to the matchmaker and sends message to all relevant hosts. """ - conf = FLAGS + conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) queues = matchmaker.queues(topic) @@ -641,26 +639,22 @@ def create_connection(conf, new=True): def multicall(conf, *args, **kwargs): """Multiple calls.""" - register_opts(conf) return _multi_send(_call, *args, **kwargs) def call(conf, *args, **kwargs): """Send a message, expect a response.""" - register_opts(conf) data = _multi_send(_call, *args, **kwargs) return data[-1] def cast(conf, *args, **kwargs): """Send a message expecting no reply.""" - register_opts(conf) _multi_send(_cast, *args, **kwargs) def fanout_cast(conf, context, topic, msg, **kwargs): """Send a message to all listening and expect no reply.""" - register_opts(conf) # NOTE(ewindisch): fanout~ is used because it avoid splitting on . # and acts as a non-subtle hint to the matchmaker and ZmqProxy. _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) @@ -672,7 +666,6 @@ def notify(conf, context, topic, msg, **kwargs): Notifications are sent to topic-priority. This differs from the AMQP drivers which send to topic.priority. """ - register_opts(conf) # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') @@ -684,7 +677,7 @@ def cleanup(): global ZMQ_CTX global matchmaker matchmaker = None - ZMQ_CTX.destroy() + ZMQ_CTX.term() ZMQ_CTX = None @@ -697,11 +690,11 @@ def register_opts(conf): # We memoize through these globals global ZMQ_CTX global matchmaker - global FLAGS + global CONF - if not FLAGS: + if not CONF: conf.register_opts(zmq_opts) - FLAGS = conf + CONF = conf # Don't re-set, if this method is called twice. if not ZMQ_CTX: ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index 4416a3b19..c4f6cf049 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -93,16 +93,34 @@ def set_time_override(override_time=datetime.datetime.utcnow()): def advance_time_delta(timedelta): - """Advance overriden time using a datetime.timedelta.""" + """Advance overridden time using a datetime.timedelta.""" assert(not utcnow.override_time is None) utcnow.override_time += timedelta def advance_time_seconds(seconds): - """Advance overriden time by seconds.""" + """Advance overridden time by seconds.""" advance_time_delta(datetime.timedelta(0, seconds)) def clear_time_override(): """Remove the overridden time.""" utcnow.override_time = None + + +def marshall_now(now=None): + """Make an rpc-safe datetime with microseconds. + + Note: tzinfo is stripped, but not required for relative times.""" + if not now: + now = utcnow() + return dict(day=now.day, month=now.month, year=now.year, hour=now.hour, + minute=now.minute, second=now.second, + microsecond=now.microsecond) + + +def unmarshall_time(tyme): + """Unmarshall a datetime dict.""" + return datetime.datetime(day=tyme['day'], month=tyme['month'], + year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], + second=tyme['second'], microsecond=tyme['microsecond'])