diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py index 3ff6b89b6..f710d11e6 100644 --- a/ceilometer/openstack/common/cfg.py +++ b/ceilometer/openstack/common/cfg.py @@ -42,8 +42,8 @@ Options can be strings, integers, floats, booleans, lists or 'multi strings':: osapi_compute_extension_opt = cfg.MultiStrOpt('osapi_compute_extension', default=DEFAULT_EXTENSIONS) -Option schemas are registered with with the config manager at runtime, but -before the option is referenced:: +Option schemas are registered with the config manager at runtime, but before +the option is referenced:: class ExtensionManager(object): @@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt): :raises: DuplicateOptError if a naming conflict is detected """ if opt.dest in opts: - if opts[opt.dest]['opt'] is not opt: + if opts[opt.dest]['opt'] != opt: raise DuplicateOptError(opt.name) return True else: @@ -527,6 +527,9 @@ class Opt(object): else: self.deprecated_name = None + def __ne__(self, another): + return vars(self) != vars(another) + def _get_from_config_parser(self, cparser, section): """Retrieves the option value from a MultiConfigParser object. diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py index 67d94ad5f..2fbb0291a 100644 --- a/ceilometer/openstack/common/importutils.py +++ b/ceilometer/openstack/common/importutils.py @@ -20,6 +20,7 @@ Import related utilities and helper functions. """ import sys +import traceback def import_class(import_str): @@ -30,7 +31,8 @@ def import_class(import_str): return getattr(sys.modules[mod_str], class_str) except (ImportError, ValueError, AttributeError), exc: raise ImportError('Class %s cannot be found (%s)' % - (class_str, str(exc))) + (class_str, + traceback.format_exception(*sys.exc_info()))) def import_object(import_str, *args, **kwargs): diff --git a/ceilometer/openstack/common/iniparser.py b/ceilometer/openstack/common/iniparser.py index e91eea538..241284449 100644 --- a/ceilometer/openstack/common/iniparser.py +++ b/ceilometer/openstack/common/iniparser.py @@ -53,7 +53,8 @@ class BaseParser(object): key, value = line[:colon], line[colon + 1:] value = value.strip() - if value[0] == value[-1] and value[0] == "\"" or value[0] == "'": + if ((value and value[0] == value[-1]) and + (value[0] == "\"" or value[0] == "'")): value = value[1:-1] return key.strip(), [value] diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index 752266981..dad84dfd5 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -39,6 +39,8 @@ import itertools import json import xmlrpclib +from ceilometer.openstack.common import timeutils + def to_primitive(value, convert_instances=False, level=0): """Convert a complex object into primitives. @@ -101,13 +103,15 @@ def to_primitive(value, convert_instances=False, level=0): level=level) return o elif isinstance(value, datetime.datetime): - return str(value) + return timeutils.strtime(value) elif hasattr(value, 'iteritems'): return to_primitive(dict(value.iteritems()), convert_instances=convert_instances, - level=level) + level=level + 1) elif hasattr(value, '__iter__'): - return to_primitive(list(value), level) + return to_primitive(list(value), + convert_instances=convert_instances, + level=level) elif convert_instances and hasattr(value, '__dict__'): # Likely an instance of something. Watch for cycles. # Ignore class member vars. @@ -130,11 +134,15 @@ def loads(s): return json.loads(s) +def load(s): + return json.load(s) + + try: import anyjson except ImportError: pass else: anyjson._modules.append((__name__, 'dumps', TypeError, - 'loads', ValueError)) + 'loads', ValueError, 'load')) anyjson.force_implementation(__name__) diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index c1454a8a0..e078f3e59 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -41,6 +41,7 @@ import sys import traceback from ceilometer.openstack.common import cfg +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import local from ceilometer.openstack.common import notifier @@ -65,13 +66,13 @@ log_opts = [ help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', default=[ - 'amqplib=WARN', - 'sqlalchemy=WARN', - 'boto=WARN', - 'suds=INFO', - 'keystone=INFO', - 'eventlet.wsgi.server=WARN' - ], + 'amqplib=WARN', + 'sqlalchemy=WARN', + 'boto=WARN', + 'suds=INFO', + 'keystone=INFO', + 'eventlet.wsgi.server=WARN' + ], help='list of logger=LEVEL pairs'), cfg.BoolOpt('publish_errors', default=False, @@ -88,7 +89,7 @@ log_opts = [ default='[instance: %(uuid)s] ', help='If an instance UUID is passed with the log message, ' 'format it like this'), - ] +] generic_log_opts = [ @@ -104,7 +105,7 @@ generic_log_opts = [ cfg.StrOpt('logfile_mode', default='0644', help='Default file mode used when creating log files'), - ] +] CONF = cfg.CONF @@ -207,9 +208,9 @@ class JSONFormatter(logging.Formatter): def formatException(self, ei, strip_newlines=True): lines = traceback.format_exception(*ei) if strip_newlines: - lines = [itertools.ifilter(lambda x: x, - line.rstrip().splitlines()) - for line in lines] + lines = [itertools.ifilter( + lambda x: x, + line.rstrip().splitlines()) for line in lines] lines = list(itertools.chain(*lines)) return lines @@ -246,26 +247,27 @@ class JSONFormatter(logging.Formatter): class PublishErrorsHandler(logging.Handler): def emit(self, record): - if 'list_notifier_drivers' in CONF: - if ('ceilometer.openstack.common.notifier.log_notifier' in - CONF.list_notifier_drivers): - return + if ('ceilometer.openstack.common.notifier.log_notifier' in + CONF.notification_driver): + return notifier.api.notify(None, 'error.publisher', - 'error_notification', - notifier.api.ERROR, - dict(error=record.msg)) + 'error_notification', + notifier.api.ERROR, + dict(error=record.msg)) -def handle_exception(type, value, tb): - extra = {} - if CONF.verbose: - extra['exc_info'] = (type, value, tb) - getLogger().critical(str(value), **extra) +def _create_logging_excepthook(product_name): + def logging_excepthook(type, value, tb): + extra = {} + if CONF.verbose: + extra['exc_info'] = (type, value, tb) + getLogger(product_name).critical(str(value), **extra) + return logging_excepthook def setup(product_name): """Setup logging.""" - sys.excepthook = handle_exception + sys.excepthook = _create_logging_excepthook(product_name) if CONF.log_config: try: @@ -356,17 +358,6 @@ def _setup_logging_from_conf(product_name): for handler in log_root.handlers: logger.addHandler(handler) - # NOTE(jkoelker) Clear the handlers for the root logger that was setup - # by basicConfig in nova/__init__.py and install the - # NullHandler. - root = logging.getLogger() - for handler in root.handlers: - root.removeHandler(handler) - handler = NullHandler() - handler.setFormatter(logging.Formatter()) - root.addHandler(handler) - - _loggers = {} @@ -404,8 +395,12 @@ class LegacyFormatter(logging.Formatter): def format(self, record): """Uses contextstring if request_id is set, otherwise default.""" - if 'instance' not in record.__dict__: - record.__dict__['instance'] = '' + # NOTE(sdague): default the fancier formating params + # to an empty string so we don't throw an exception if + # they get used + for key in ('instance', 'color'): + if key not in record.__dict__: + record.__dict__[key] = '' if record.__dict__.get('request_id', None): self._fmt = CONF.logging_context_format_string diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 602d99c3d..20e6abf0c 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -18,6 +18,7 @@ import uuid from ceilometer.openstack.common import cfg from ceilometer.openstack.common import context +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import log as logging @@ -27,16 +28,17 @@ from ceilometer.openstack.common import timeutils LOG = logging.getLogger(__name__) notifier_opts = [ - cfg.StrOpt('notification_driver', - default='ceilometer.openstack.common.notifier.no_op_notifier', - help='Default driver for sending notifications'), + cfg.MultiStrOpt('notification_driver', + default=[], + deprecated_name='list_notifier_drivers', + help='Driver or drivers to handle sending notifications'), cfg.StrOpt('default_notification_level', default='INFO', help='Default notification level for outgoing notifications'), cfg.StrOpt('default_publisher_id', default='$host', help='Default publisher_id for outgoing notifications'), - ] +] CONF = cfg.CONF CONF.register_opts(notifier_opts) @@ -121,21 +123,60 @@ def notify(context, publisher_id, event_type, priority, payload): """ if priority not in log_levels: raise BadPriorityException( - _('%s not in valid priorities') % priority) + _('%s not in valid priorities') % priority) # Ensure everything is JSON serializable. payload = jsonutils.to_primitive(payload, convert_instances=True) - driver = importutils.import_module(CONF.notification_driver) msg = dict(message_id=str(uuid.uuid4()), - publisher_id=publisher_id, - event_type=event_type, - priority=priority, - payload=payload, - timestamp=str(timeutils.utcnow())) - try: - driver.notify(context, msg) - except Exception, e: - LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system. Payload=%(payload)s") % - locals()) + publisher_id=publisher_id, + event_type=event_type, + priority=priority, + payload=payload, + timestamp=str(timeutils.utcnow())) + + for driver in _get_drivers(): + try: + driver.notify(context, msg) + except Exception, e: + LOG.exception(_("Problem '%(e)s' attempting to " + "send to notification system. Payload=%(payload)s") % + locals()) + + +_drivers = None + + +def _get_drivers(): + """Instantiate, cache, and return drivers based on the CONF.""" + global _drivers + if _drivers is None: + _drivers = {} + for notification_driver in CONF.notification_driver: + add_driver(notification_driver) + + return _drivers.values() + + +def add_driver(notification_driver): + """Add a notification driver at runtime.""" + # Make sure the driver list is initialized. + _get_drivers() + if isinstance(notification_driver, basestring): + # Load and add + try: + driver = importutils.import_module(notification_driver) + _drivers[notification_driver] = driver + except ImportError as e: + LOG.exception(_("Failed to load notifier %s. " + "These notifications will not be sent.") % + notification_driver) + else: + # Driver is already loaded; just add the object. + _drivers[notification_driver] = notification_driver + + +def _reset_drivers(): + """Used by unit tests to reset the drivers.""" + global _drivers + _drivers = None diff --git a/ceilometer/openstack/common/notifier/list_notifier.py b/ceilometer/openstack/common/notifier/list_notifier.py index c096821f0..b8533f9f5 100644 --- a/ceilometer/openstack/common/notifier/list_notifier.py +++ b/ceilometer/openstack/common/notifier/list_notifier.py @@ -14,13 +14,15 @@ # under the License. from ceilometer.openstack.common import cfg +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import log as logging -list_notifier_drivers_opt = cfg.MultiStrOpt('list_notifier_drivers', - default=['ceilometer.openstack.common.notifier.no_op_notifier'], - help='List of drivers to send notifications') +list_notifier_drivers_opt = cfg.MultiStrOpt( + 'list_notifier_drivers', + default=['ceilometer.openstack.common.notifier.no_op_notifier'], + help='List of drivers to send notifications') CONF = cfg.CONF CONF.register_opt(list_notifier_drivers_opt) diff --git a/ceilometer/openstack/common/notifier/log_notifier.py b/ceilometer/openstack/common/notifier/log_notifier.py index 5f3d3fd6b..33acbe08c 100644 --- a/ceilometer/openstack/common/notifier/log_notifier.py +++ b/ceilometer/openstack/common/notifier/log_notifier.py @@ -30,6 +30,6 @@ def notify(_context, message): CONF.default_notification_level) priority = priority.lower() logger = logging.getLogger( - 'ceilometer.openstack.common.notification.%s' - % message['event_type']) + 'ceilometer.openstack.common.notification.%s' % + message['event_type']) getattr(logger, priority)(jsonutils.dumps(message)) diff --git a/ceilometer/openstack/common/notifier/rabbit_notifier.py b/ceilometer/openstack/common/notifier/rabbit_notifier.py index 604a23dfb..85272a155 100644 --- a/ceilometer/openstack/common/notifier/rabbit_notifier.py +++ b/ceilometer/openstack/common/notifier/rabbit_notifier.py @@ -16,14 +16,15 @@ from ceilometer.openstack.common import cfg from ceilometer.openstack.common import context as req_context +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import rpc LOG = logging.getLogger(__name__) -notification_topic_opt = cfg.ListOpt('notification_topics', - default=['notifications', ], - help='AMQP topic used for openstack notifications') +notification_topic_opt = cfg.ListOpt( + 'notification_topics', default=['notifications', ], + help='AMQP topic used for openstack notifications') CONF = cfg.CONF CONF.register_opt(notification_topic_opt) diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index 858e2e60c..c35521042 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -48,7 +48,8 @@ rpc_opts = [ 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', default=['ceilometer.openstack.common.exception', - 'nova.exception'], + 'nova.exception', + ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), cfg.StrOpt('control_exchange', diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index 17e82300e..16635149d 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -35,6 +35,7 @@ from eventlet import pools from eventlet import semaphore from ceilometer.openstack.common import excutils +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import local from ceilometer.openstack.common.rpc import common as rpc_common diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index ee493c689..4f35d96b8 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -23,10 +23,10 @@ import sys import traceback from ceilometer.openstack.common import cfg +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import local -from ceilometer.openstack.common.gettextutils import _ LOG = logging.getLogger(__name__) @@ -108,7 +108,7 @@ class Connection(object): """ raise NotImplementedError() - def create_consumer(self, conf, topic, proxy, fanout=False): + def create_consumer(self, topic, proxy, fanout=False): """Create a consumer on this connection. A consumer is associated with a message queue on the backend message @@ -117,7 +117,6 @@ class Connection(object): off of the queue will determine which method gets called on the proxy object. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. For example, all instances of nova-compute consume @@ -133,7 +132,7 @@ class Connection(object): """ raise NotImplementedError() - def create_worker(self, conf, topic, proxy, pool_name): + def create_worker(self, topic, proxy, pool_name): """Create a worker on this connection. A worker is like a regular consumer of messages directed to a @@ -143,7 +142,6 @@ class Connection(object): be asked to process it. Load is distributed across the members of the pool in round-robin fashion. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py index c42f2a096..d1136acb1 100644 --- a/ceilometer/openstack/common/rpc/dispatcher.py +++ b/ceilometer/openstack/common/rpc/dispatcher.py @@ -40,6 +40,45 @@ The conversion over to a versioned API must be done on both the client side and server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. + + +EXAMPLES: + +Nova was the first project to use versioned rpc APIs. Consider the compute rpc +API as an example. The client side is in nova/compute/rpcapi.py and the server +side is in nova/compute/manager.py. + + +Example 1) Adding a new method. + +Adding a new method is a backwards compatible change. It should be added to +nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to +X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should +have a specific version specified to indicate the minimum API version that must +be implemented for the method to be supported. For example: + + def get_host_uptime(self, ctxt, host): + topic = _compute_topic(self.topic, ctxt, host, None) + return self.call(ctxt, self.make_msg('get_host_uptime'), topic, + version='1.1') + +In this case, version '1.1' is the first version that supported the +get_host_uptime() method. + + +Example 2) Adding a new parameter. + +Adding a new parameter to an rpc method can be made backwards compatible. The +RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. +The implementation of the method must not expect the parameter to be present. + + def some_remote_method(self, arg1, arg2, newarg=None): + # The code needs to deal with newarg=None for cases + # where an older client sends a message without it. + pass + +On the client side, the same changes should be made as in example 1. The +minimum version that supports the new parameter should be specified. """ from ceilometer.openstack.common.rpc import common as rpc_common diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index c72a88710..d50a8b1cf 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -17,24 +17,18 @@ import functools import itertools -import json import logging import time import uuid import eventlet import greenlet -try: - import qpid.messaging - import qpid.messaging.exceptions -except ImportError: - # FIXME(dhellmann): Trying to prevent errors - # running the tests on stackforge, where qpid - # is not installed. - pass +import qpid.messaging +import qpid.messaging.exceptions from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common.rpc import amqp as rpc_amqp from ceilometer.openstack.common.rpc import common as rpc_common @@ -131,7 +125,7 @@ class ConsumerBase(object): addr_opts["node"]["x-declare"].update(node_opts) addr_opts["link"]["x-declare"].update(link_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -147,7 +141,7 @@ class ConsumerBase(object): try: self.callback(message.content) except Exception: - logging.exception(_("Failed to process message... skipping it.")) + LOG.exception(_("Failed to process message... skipping it.")) finally: self.session.acknowledge(message) @@ -236,7 +230,7 @@ class Publisher(object): if node_opts: addr_opts["node"]["x-declare"].update(node_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -335,7 +329,7 @@ class Connection(object): if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( self.conf.qpid_reconnect_interval) - self.connection.hearbeat = self.conf.qpid_heartbeat + self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 45f5a557c..3eff97fc6 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -47,10 +47,12 @@ zmq_opts = [ 'address.'), # The module.Class to use for matchmaking. - cfg.StrOpt('rpc_zmq_matchmaker', - default='ceilometer.' - 'openstack.common.rpc.matchmaker.MatchMakerLocalhost', - help='MatchMaker driver'), + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('ceilometer.openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, @@ -61,9 +63,10 @@ zmq_opts = [ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), help='Name of this node. Must be a valid hostname, FQDN, or ' - 'IP address') + 'IP address. Must match "host" option, if running Nova.') ] @@ -128,7 +131,7 @@ class ZmqSocket(object): 'subscribe': subscribe, 'bind': bind} LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) - LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) + LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) LOG.debug(_("-> bind: %(bind)s"), str_data) try: @@ -717,3 +720,6 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() + + +register_opts(cfg.CONF) diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index 44c04cca3..bf4f85d4b 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -24,6 +24,7 @@ import json import logging from ceilometer.openstack.common import cfg +from ceilometer.openstack.common.gettextutils import _ matchmaker_opts = [ diff --git a/ceilometer/openstack/common/rpc/proxy.py b/ceilometer/openstack/common/rpc/proxy.py index 493f88488..42e596213 100644 --- a/ceilometer/openstack/common/rpc/proxy.py +++ b/ceilometer/openstack/common/rpc/proxy.py @@ -112,11 +112,12 @@ class RpcProxy(object): self._set_version(msg, version) rpc.cast(context, self._get_topic(topic), msg) - def fanout_cast(self, context, msg, version=None): + def fanout_cast(self, context, msg, topic=None, version=None): """rpc.fanout_cast() a remote method. :param context: The request context :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. :param version: (Optional) Override the requested API version in this message. @@ -124,7 +125,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) - rpc.fanout_cast(context, self.topic, msg) + rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, version=None): @@ -144,13 +145,15 @@ class RpcProxy(object): self._set_version(msg, version) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) - def fanout_cast_to_server(self, context, server_params, msg, version=None): + def fanout_cast_to_server(self, context, server_params, msg, topic=None, + version=None): """rpc.fanout_cast_to_server() a remote method. :param context: The request context :param server_params: Server parameters. See rpc.cast_to_server() for details. :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. :param version: (Optional) Override the requested API version in this message. @@ -158,4 +161,5 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) - rpc.fanout_cast_to_server(context, server_params, self.topic, msg) + rpc.fanout_cast_to_server(context, server_params, + self._get_topic(topic), msg) diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index 5eeaf70aa..4416a3b19 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -21,7 +21,6 @@ Time related utilities and helper functions. import calendar import datetime -import time import iso8601