From c49264a285e039ec88fbae8eff06062270e1f02c Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Wed, 13 Feb 2013 12:14:35 -0500 Subject: [PATCH] Update openstack/common tree This update is a prerequisite for working on moving the notification listener code into Oslo. blueprint move-listener-framework-oslo Change-Id: Iffd0a5903eb378df004de7b919df249bc053aa81 Signed-off-by: Doug Hellmann --- ceilometer/openstack/common/cfg.py | 30 +++++-- ceilometer/openstack/common/jsonutils.py | 43 +++++----- ceilometer/openstack/common/local.py | 11 +++ ceilometer/openstack/common/rpc/__init__.py | 60 +++++++++++--- ceilometer/openstack/common/rpc/amqp.py | 2 +- ceilometer/openstack/common/rpc/common.py | 2 +- ceilometer/openstack/common/rpc/impl_kombu.py | 23 +++-- ceilometer/openstack/common/rpc/impl_qpid.py | 18 +++- ceilometer/openstack/common/rpc/impl_zmq.py | 83 ++++++++----------- ceilometer/openstack/common/rpc/matchmaker.py | 17 ++-- ceilometer/openstack/common/setup.py | 30 +++++-- ceilometer/openstack/common/timeutils.py | 18 ++++ ceilometer/openstack/common/version.py | 21 ++++- 13 files changed, 235 insertions(+), 123 deletions(-) diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py index 1beb29889..7fc90e602 100644 --- a/ceilometer/openstack/common/cfg.py +++ b/ceilometer/openstack/common/cfg.py @@ -863,7 +863,7 @@ class SubCommandOpt(Opt): description=self.description, help=self.help) - if not self.handler is None: + if self.handler is not None: self.handler(subparsers) @@ -1297,6 +1297,24 @@ class ConfigOpts(collections.Mapping): __import__(module_str) self._get_opt_info(name, group) + def import_group(self, group, module_str): + """Import an option group from a module. + + Import a module and check that a given option group is registered. + + This is intended for use with global configuration objects + like cfg.CONF where modules commonly register options with + CONF at module load time. If one module requires an option group + defined by another module it can use this method to explicitly + declare the dependency. + + :param group: an option OptGroup object or group name + :param module_str: the name of a module to import + :raises: ImportError, NoSuchGroupError + """ + __import__(module_str) + self._get_group(group) + @__clear_cache def set_override(self, name, override, group=None): """Override an opt value. @@ -1547,8 +1565,8 @@ class ConfigOpts(collections.Mapping): group = group_or_name if isinstance(group_or_name, OptGroup) else None group_name = group.name if group else group_or_name - if not group_name in self._groups: - if not group is None or not autocreate: + if group_name not in self._groups: + if group is not None or not autocreate: raise NoSuchGroupError(group_name) self.register_group(OptGroup(name=group_name)) @@ -1568,7 +1586,7 @@ class ConfigOpts(collections.Mapping): group = self._get_group(group) opts = group._opts - if not opt_name in opts: + if opt_name not in opts: raise NoSuchOptError(opt_name, group) return opts[opt_name] @@ -1606,7 +1624,7 @@ class ConfigOpts(collections.Mapping): opt = info['opt'] if opt.required: - if ('default' in info or 'override' in info): + if 'default' in info or 'override' in info: continue if self._get(opt.dest, group) is None: @@ -1625,7 +1643,7 @@ class ConfigOpts(collections.Mapping): """ self._args = args - for opt, group in self._all_cli_opts(): + for opt, group in sorted(self._all_cli_opts()): opt._add_to_cli(self._oparser, group) return vars(self._oparser.parse_args(args)) diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index 47ec79e80..c34a8c327 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -34,15 +34,21 @@ This module provides a few things: import datetime +import functools import inspect import itertools import json +import logging import xmlrpclib +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import timeutils +LOG = logging.getLogger(__name__) -def to_primitive(value, convert_instances=False, level=0): + +def to_primitive(value, convert_instances=False, convert_datetime=True, + level=0, max_depth=3): """Convert a complex object into primitives. Handy for JSON serialization. We can optionally handle instances, @@ -78,12 +84,19 @@ def to_primitive(value, convert_instances=False, level=0): if getattr(value, '__module__', None) == 'mox': return 'mock' - if level > 3: + if level > max_depth: + LOG.error(_('Max serialization depth exceeded on object: %d %s'), + level, value) return '?' # The try block may not be necessary after the class check above, # but just in case ... try: + recursive = functools.partial(to_primitive, + convert_instances=convert_instances, + convert_datetime=convert_datetime, + level=level, + max_depth=max_depth) # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled @@ -91,33 +104,19 @@ def to_primitive(value, convert_instances=False, level=0): value = datetime.datetime(*tuple(value.timetuple())[:6]) if isinstance(value, (list, tuple)): - o = [] - for v in value: - o.append(to_primitive(v, convert_instances=convert_instances, - level=level)) - return o + return [recursive(v) for v in value] elif isinstance(value, dict): - o = {} - for k, v in value.iteritems(): - o[k] = to_primitive(v, convert_instances=convert_instances, - level=level) - return o - elif isinstance(value, datetime.datetime): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): - return to_primitive(dict(value.iteritems()), - convert_instances=convert_instances, - level=level + 1) + return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): - return to_primitive(list(value), - convert_instances=convert_instances, - level=level) + return recursive(list(value)) elif convert_instances and hasattr(value, '__dict__'): # Likely an instance of something. Watch for cycles. # Ignore class member vars. - return to_primitive(value.__dict__, - convert_instances=convert_instances, - level=level + 1) + return recursive(value.__dict__, level=level + 1) else: return value except TypeError: diff --git a/ceilometer/openstack/common/local.py b/ceilometer/openstack/common/local.py index 19d962732..8bdc837a9 100644 --- a/ceilometer/openstack/common/local.py +++ b/ceilometer/openstack/common/local.py @@ -26,6 +26,9 @@ class WeakLocal(corolocal.local): def __getattribute__(self, attr): rval = corolocal.local.__getattribute__(self, attr) if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. rval = rval() return rval @@ -34,4 +37,12 @@ class WeakLocal(corolocal.local): return corolocal.local.__setattr__(self, attr, value) +# NOTE(mikal): the name "store" should be deprecated in the future store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = corolocal.local diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index e91ac20ec..b127a9502 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ +import inspect +import logging + from ceilometer.openstack.common import cfg +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils +from ceilometer.openstack.common import local + + +LOG = logging.getLogger(__name__) rpc_opts = [ @@ -62,7 +70,8 @@ rpc_opts = [ help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] -cfg.CONF.register_opts(rpc_opts) +CONF = cfg.CONF +CONF.register_opts(rpc_opts) def set_defaults(control_exchange): @@ -83,10 +92,27 @@ def create_connection(new=True): :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(cfg.CONF, new=new) + return _get_impl().create_connection(CONF, new=new) -def call(context, topic, msg, timeout=None): +def _check_for_lock(): + if not CONF.debug: + return None + + if ((hasattr(local.strong_store, 'locks_held') + and local.strong_store.locks_held)): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + LOG.warn(_('A RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, + 'stack': stack}) + return True + + return False + + +def call(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().call(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().call(CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -124,7 +154,7 @@ def cast(context, topic, msg): :returns: None """ - return _get_impl().cast(cfg.CONF, context, topic, msg) + return _get_impl().cast(CONF, context, topic, msg) def fanout_cast(context, topic, msg): @@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg): :returns: None """ - return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) + return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None): +def multicall(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().multicall(CONF, context, topic, msg, timeout) def notify(context, topic, msg, envelope=False): @@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) @@ -263,10 +297,10 @@ def _get_impl(): global _RPCIMPL if _RPCIMPL is None: try: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: # For backwards compatibility with older nova config. - impl = cfg.CONF.rpc_backend.replace('nova.rpc', - 'nova.openstack.common.rpc') + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index a69b0cedc..9adcbd099 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 0bfbb8a37..b65236485 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data): # NOTE(ameade): We DO NOT want to allow just any module to be imported, in # order to prevent arbitrary code execution. - if not module in conf.allowed_rpc_exception_modules: + if module not in conf.allowed_rpc_exception_modules: return RemoteError(name, failure.get('message'), trace) try: diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index e120e3bbe..3672dbe66 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -66,7 +66,8 @@ kombu_opts = [ help='the RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password'), + help='the RabbitMQ password', + secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', help='the RabbitMQ virtual host'), @@ -302,9 +303,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +660,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +670,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +698,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +708,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 0defe0216..46953974b 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -51,7 +51,8 @@ qpid_opts = [ help='Username for qpid connection'), cfg.StrOpt('qpid_password', default='', - help='Password for qpid connection'), + help='Password for qpid connection', + secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), @@ -486,9 +487,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 1c22ec8db..49e610c37 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -17,7 +17,6 @@ import os import pprint import socket -import string import sys import types import uuid @@ -90,7 +89,7 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data, ensure_ascii=True)) + return jsonutils.dumps(data, ensure_ascii=True) except TypeError: LOG.error(_("JSON serialization failed.")) raise @@ -218,10 +217,11 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + msg_id = msg_id or 0 + if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) def close(self): self.outq.close() @@ -295,13 +295,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -321,21 +321,22 @@ class ConsumerBase(object): return [result] def process(self, style, target, proxy, ctx, data): + data.setdefault('version', None) + data.setdefault('args', {}) + # Method starting with - are # processed internally. (non-valid method name) - method = data['method'] + method = data.get('method') + if not method: + LOG.error(_("RPC message did not include method.")) + return # Internal method # uses internal context for safety. - if data['method'][0] == '-': - # For reply / process_reply - method = method[1:] - if method == 'reply': - self.private_ctx.reply(ctx, proxy, **data['args']) + if method == '-reply': + self.private_ctx.reply(ctx, proxy, **data['args']) return - data.setdefault('version', None) - data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -436,20 +437,12 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic - if topic.startswith('fanout~'): + if topic.startswith('fanout~') or topic.startswith('zmq_replies'): sock_type = zmq.PUB - elif topic.startswith('zmq_replies'): - sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: + if topic not in self.topic_proxy: def publisher(waiter): LOG.info(_("Creating proxy for topic: %s"), topic) @@ -600,8 +593,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, serialize=True, + force_envelope=False, _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None, +def _call(addr, context, topic, msg, timeout=None, serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload, + _cast(addr, context, topic, payload, serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) @@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = _deserialize(msg[-1])[-1]['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): + force_envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, serialize, + force_envelope, _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout, + return method(_addr, context, _topic, msg, timeout, serialize, force_envelope) @@ -777,21 +772,9 @@ def _get_ctxt(): return ZMQ_CTX -def _get_matchmaker(): +def _get_matchmaker(*args, **kwargs): global matchmaker if not matchmaker: - # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = CONF.rpc_zmq_matchmaker.split('.') - mm_module = '.'.join(mm_path[:-1]) - mm_class = mm_path[-1] - - # Only initialize a class. - if mm_path[-1][0] not in string.ascii_uppercase: - LOG.error(_("Matchmaker could not be loaded.\n" - "rpc_zmq_matchmaker is not a class.")) - raise RPCException(_("Error loading Matchmaker.")) - - mm_impl = importutils.import_module(mm_module) - mm_constructor = getattr(mm_impl, mm_class) - matchmaker = mm_constructor() + matchmaker = importutils.import_object( + CONF.rpc_zmq_matchmaker, *args, **kwargs) return matchmaker diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index 87679e8e8..dbbd57c73 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -201,24 +201,25 @@ class FanoutRingExchange(RingExchange): class LocalhostExchange(Exchange): """Exchange where all direct topics are local.""" - def __init__(self): + def __init__(self, host='localhost'): + self.host = host super(Exchange, self).__init__() def run(self, key): - return [(key.split('.')[0] + '.localhost', 'localhost')] + return [('.'.join((key.split('.')[0], self.host)), self.host)] class DirectExchange(Exchange): """ Exchange where all topic keys are split, sending to second half. - i.e. "compute.host" sends a message to "compute" running on "host" + i.e. "compute.host" sends a message to "compute.host" running on "host" """ def __init__(self): super(Exchange, self).__init__() def run(self, key): - b, e = key.split('.', 1) - return [(b, e)] + e = key.split('.', 1)[1] + return [(key, e)] class MatchMakerRing(MatchMakerBase): @@ -237,11 +238,11 @@ class MatchMakerLocalhost(MatchMakerBase): Match Maker where all bare topics resolve to localhost. Useful for testing. """ - def __init__(self): + def __init__(self, host='localhost'): super(MatchMakerLocalhost, self).__init__() - self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(FanoutBinding(), LocalhostExchange(host)) self.add_binding(DirectBinding(), DirectExchange()) - self.add_binding(TopicBinding(), LocalhostExchange()) + self.add_binding(TopicBinding(), LocalhostExchange(host)) class MatchMakerStub(MatchMakerBase): diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py index 81a3d20e9..35680b304 100644 --- a/ceilometer/openstack/common/setup.py +++ b/ceilometer/openstack/common/setup.py @@ -258,7 +258,23 @@ def get_cmdclass(): return cmdclass -def get_version_from_git(pre_version): +def _get_revno(): + """Return the number of commits since the most recent tag. + + We use git-describe to find this out, but if there are no + tags then we fall back to counting commits since the beginning + of time. + """ + describe = _run_shell_command("git describe --always") + if "-" in describe: + return describe.rsplit("-", 2)[-2] + + # no tags found + revlist = _run_shell_command("git rev-list --abbrev-commit HEAD") + return len(revlist.splitlines()) + + +def _get_version_from_git(pre_version): """Return a version which is equal to the tag that's on the current revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" @@ -266,21 +282,19 @@ def get_version_from_git(pre_version): if os.path.isdir('.git'): if pre_version: try: - return _run_shell_command( + return _run_shell_command( "git describe --exact-match", throw_on_error=True).replace('-', '.') except Exception: sha = _run_shell_command("git log -n1 --pretty=format:%h") - describe = _run_shell_command("git describe --always") - revno = describe.rsplit("-", 2)[-2] - return "%s.a%s.g%s" % (pre_version, revno, sha) + return "%s.a%s.g%s" % (pre_version, _get_revno(), sha) else: return _run_shell_command( "git describe --always").replace('-', '.') return None -def get_version_from_pkg_info(package_name): +def _get_version_from_pkg_info(package_name): """Get the version from PKG-INFO file if we can.""" try: pkg_info_file = open('PKG-INFO', 'r') @@ -311,10 +325,10 @@ def get_version(package_name, pre_version=None): version = os.environ.get("OSLO_PACKAGE_VERSION", None) if version: return version - version = get_version_from_pkg_info(package_name) + version = _get_version_from_pkg_info(package_name) if version: return version - version = get_version_from_git(pre_version) + version = _get_version_from_git(pre_version) if version: return version raise Exception("Versioning for this project requires either an sdist" diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index 0f346087f..e2c274057 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -98,6 +98,11 @@ def utcnow(): return datetime.datetime.utcnow() +def iso8601_from_timestamp(timestamp): + """Returns a iso8601 formated date from timestamp""" + return isotime(datetime.datetime.utcfromtimestamp(timestamp)) + + utcnow.override_time = None @@ -162,3 +167,16 @@ def delta_seconds(before, after): except AttributeError: return ((delta.days * 24 * 3600) + delta.seconds + float(delta.microseconds) / (10 ** 6)) + + +def is_soon(dt, window): + """ + Determines if time is going to happen in the next window seconds. + + :params dt: the time + :params window: minimum seconds to remain to consider the time not soon + + :return: True if expiration is within the given duration + """ + soon = (utcnow() + datetime.timedelta(seconds=window)) + return normalize_time(dt) <= soon diff --git a/ceilometer/openstack/common/version.py b/ceilometer/openstack/common/version.py index 3653ad0d0..5500f1b98 100644 --- a/ceilometer/openstack/common/version.py +++ b/ceilometer/openstack/common/version.py @@ -33,12 +33,27 @@ class VersionInfo(object): self.version = None self._cached_version = None + def __str__(self): + """Make the VersionInfo object behave like a string.""" + return self.version_string() + + def __repr__(self): + """Include the name.""" + return "VersionInfo(%s:%s)" % (self.package, self.version_string()) + def _get_version_from_pkg_resources(self): """Get the version of the package from the pkg_resources record associated with the package.""" - requirement = pkg_resources.Requirement.parse(self.package) - provider = pkg_resources.get_provider(requirement) - return provider.version + try: + requirement = pkg_resources.Requirement.parse(self.package) + provider = pkg_resources.get_provider(requirement) + return provider.version + except pkg_resources.DistributionNotFound: + # The most likely cause for this is running tests in a tree + # produced from a tarball where the package itself has not been + # installed into anything. Revert to setup-time logic. + from ceilometer.openstack.common import setup + return setup.get_version(self.package) def release_string(self): """Return the full version of the package including suffixes indicating