From 5cb2f9caba3ea9fb55d5964bfb311d027f0e1a3a Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Wed, 1 May 2013 18:16:48 +1000 Subject: [PATCH] Update oslo before bringing in exceptions I want to bring in the base OpenstackException for the alarm api, so first updating what we currently have. Change-Id: I8128f3420eac9ab42a9d679c7b291bac1f27e5b0 --- ceilometer/openstack/common/context.py | 5 +- ceilometer/openstack/common/jsonutils.py | 56 ++++++-- ceilometer/openstack/common/log.py | 52 ++++--- ceilometer/openstack/common/loopingcall.py | 4 +- ceilometer/openstack/common/network_utils.py | 3 +- ceilometer/openstack/common/policy.py | 2 +- ceilometer/openstack/common/processutils.py | 135 ++++++++++++++++++ ceilometer/openstack/common/rpc/__init__.py | 2 +- ceilometer/openstack/common/rpc/amqp.py | 11 +- ceilometer/openstack/common/rpc/common.py | 4 +- ceilometer/openstack/common/rpc/dispatcher.py | 21 ++- ceilometer/openstack/common/rpc/impl_fake.py | 12 +- ceilometer/openstack/common/rpc/impl_kombu.py | 8 +- ceilometer/openstack/common/rpc/impl_qpid.py | 2 +- ceilometer/openstack/common/rpc/impl_zmq.py | 7 +- ceilometer/openstack/common/rpc/proxy.py | 6 +- .../openstack/common/rpc/zmq_receiver.py | 41 ++++++ ceilometer/openstack/common/uuidutils.py | 39 +++++ 18 files changed, 349 insertions(+), 61 deletions(-) create mode 100644 ceilometer/openstack/common/processutils.py create mode 100755 ceilometer/openstack/common/rpc/zmq_receiver.py create mode 100644 ceilometer/openstack/common/uuidutils.py diff --git a/ceilometer/openstack/common/context.py b/ceilometer/openstack/common/context.py index e9cfd73cc..16c98276c 100644 --- a/ceilometer/openstack/common/context.py +++ b/ceilometer/openstack/common/context.py @@ -23,11 +23,12 @@ context or provide additional information in their specific WSGI pipeline. """ import itertools -import uuid + +from ceilometer.openstack.common import uuidutils def generate_request_id(): - return 'req-' + str(uuid.uuid4()) + return 'req-%s' % uuidutils.generate_uuid() class RequestContext(object): diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index ab3620c3b..7e8ace2fa 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -38,11 +38,21 @@ import functools import inspect import itertools import json +import types import xmlrpclib from ceilometer.openstack.common import timeutils +_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, + inspect.isfunction, inspect.isgeneratorfunction, + inspect.isgenerator, inspect.istraceback, inspect.isframe, + inspect.iscode, inspect.isbuiltin, inspect.isroutine, + inspect.isabstract] + +_simple_types = (types.NoneType, int, basestring, bool, float, long) + + def to_primitive(value, convert_instances=False, convert_datetime=True, level=0, max_depth=3): """Convert a complex object into primitives. @@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, Therefore, convert_instances=True is lossy ... be aware. """ - nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod, - inspect.isfunction, inspect.isgeneratorfunction, - inspect.isgenerator, inspect.istraceback, inspect.isframe, - inspect.iscode, inspect.isbuiltin, inspect.isroutine, - inspect.isabstract] - for test in nasty: - if test(value): - return unicode(value) + # handle obvious types first - order of basic types determined by running + # full tests on nova project, resulting in the following counts: + # 572754 + # 460353 + # 379632 + # 274610 + # 199918 + # 114200 + # 51817 + # 26164 + # 6491 + # 283 + # 19 + if isinstance(value, _simple_types): + return value - # value of itertools.count doesn't get caught by inspects - # above and results in infinite loop when list(value) is called. + if isinstance(value, datetime.datetime): + if convert_datetime: + return timeutils.strtime(value) + else: + return value + + # value of itertools.count doesn't get caught by nasty_type_tests + # and results in infinite loop when list(value) is called. if type(value) == itertools.count: return unicode(value) @@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, convert_datetime=convert_datetime, level=level, max_depth=max_depth) + if isinstance(value, dict): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif isinstance(value, (list, tuple)): + return [recursive(lv) for lv in value] + # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled if isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) - if isinstance(value, (list, tuple)): - return [recursive(v) for v in value] - elif isinstance(value, dict): - return dict((k, recursive(v)) for k, v in value.iteritems()) - elif convert_datetime and isinstance(value, datetime.datetime): + if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) @@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Ignore class member vars. return recursive(value.__dict__, level=level + 1) else: + if any(test(value) for test in _nasty_type_tests): + return unicode(value) return value except TypeError: # Class objects are tricky since they may define something like diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index 7ee894cca..00cc22c6d 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -37,7 +37,6 @@ import logging import logging.config import logging.handlers import os -import stat import sys import traceback @@ -104,10 +103,7 @@ logging_cli_opts = [ generic_log_opts = [ cfg.BoolOpt('use_stderr', default=True, - help='Log output to standard error'), - cfg.StrOpt('logfile_mode', - default='0644', - help='Default file mode used when creating log files'), + help='Log output to standard error') ] log_opts = [ @@ -211,7 +207,27 @@ def _get_log_file_path(binary=None): return '%s.log' % (os.path.join(logdir, binary),) -class ContextAdapter(logging.LoggerAdapter): +class BaseLoggerAdapter(logging.LoggerAdapter): + + def audit(self, msg, *args, **kwargs): + self.log(logging.AUDIT, msg, *args, **kwargs) + + +class LazyAdapter(BaseLoggerAdapter): + def __init__(self, name='unknown', version='unknown'): + self._logger = None + self.extra = {} + self.name = name + self.version = version + + @property + def logger(self): + if not self._logger: + self._logger = getLogger(self.name, self.version) + return self._logger + + +class ContextAdapter(BaseLoggerAdapter): warn = logging.LoggerAdapter.warning def __init__(self, logger, project_name, version_string): @@ -219,8 +235,9 @@ class ContextAdapter(logging.LoggerAdapter): self.project = project_name self.version = version_string - def audit(self, msg, *args, **kwargs): - self.log(logging.AUDIT, msg, *args, **kwargs) + @property + def handlers(self): + return self.logger.handlers def deprecated(self, msg, *args, **kwargs): stdmsg = _("Deprecated: %s") % msg @@ -340,7 +357,7 @@ class LogConfigError(Exception): def _load_log_config(log_config): try: logging.config.fileConfig(log_config) - except ConfigParser.Error, exc: + except ConfigParser.Error as exc: raise LogConfigError(log_config, str(exc)) @@ -399,11 +416,6 @@ def _setup_logging_from_conf(): filelog = logging.handlers.WatchedFileHandler(logpath) log_root.addHandler(filelog) - mode = int(CONF.logfile_mode, 8) - st = os.stat(logpath) - if st.st_mode != (stat.S_IFREG | mode): - os.chmod(logpath, mode) - if CONF.use_stderr: streamlog = ColorHandler() log_root.addHandler(streamlog) @@ -432,14 +444,11 @@ def _setup_logging_from_conf(): else: log_root.setLevel(logging.WARNING) - level = logging.NOTSET for pair in CONF.default_log_levels: mod, _sep, level_name = pair.partition('=') level = logging.getLevelName(level_name) logger = logging.getLogger(mod) logger.setLevel(level) - for handler in log_root.handlers: - logger.addHandler(handler) _loggers = {} @@ -452,6 +461,15 @@ def getLogger(name='unknown', version='unknown'): return _loggers[name] +def getLazyLogger(name='unknown', version='unknown'): + """ + create a pass-through logger that does not create the real logger + until it is really needed and delegates all calls to the real logger + once it is created + """ + return LazyAdapter(name, version) + + class WritableLogger(object): """A thin wrapper that responds to `write` and logs.""" diff --git a/ceilometer/openstack/common/loopingcall.py b/ceilometer/openstack/common/loopingcall.py index d623e7430..2bc52477b 100644 --- a/ceilometer/openstack/common/loopingcall.py +++ b/ceilometer/openstack/common/loopingcall.py @@ -84,7 +84,7 @@ class FixedIntervalLoopingCall(LoopingCallBase): LOG.warn(_('task run outlasted interval by %s sec') % -delay) greenthread.sleep(delay if delay > 0 else 0) - except LoopingCallDone, e: + except LoopingCallDone as e: self.stop() done.send(e.retvalue) except Exception: @@ -131,7 +131,7 @@ class DynamicLoopingCall(LoopingCallBase): LOG.debug(_('Dynamic looping call sleeping for %.02f ' 'seconds'), idle) greenthread.sleep(idle) - except LoopingCallDone, e: + except LoopingCallDone as e: self.stop() done.send(e.retvalue) except Exception: diff --git a/ceilometer/openstack/common/network_utils.py b/ceilometer/openstack/common/network_utils.py index 5224e01aa..6713d42de 100644 --- a/ceilometer/openstack/common/network_utils.py +++ b/ceilometer/openstack/common/network_utils.py @@ -19,7 +19,8 @@ Network-related utilities and helper functions. """ -import logging +from ceilometer.openstack.common import log as logging + LOG = logging.getLogger(__name__) diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index 155de8b04..04d4e6d9a 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -57,7 +57,6 @@ as it allows particular rules to be explicitly disabled. """ import abc -import logging import re import urllib @@ -65,6 +64,7 @@ import urllib2 from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import jsonutils +from ceilometer.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/ceilometer/openstack/common/processutils.py b/ceilometer/openstack/common/processutils.py new file mode 100644 index 000000000..3541626c3 --- /dev/null +++ b/ceilometer/openstack/common/processutils.py @@ -0,0 +1,135 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import random +import shlex + +from eventlet.green import subprocess +from eventlet import greenthread + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class UnknownArgumentError(Exception): + def __init__(self, message=None): + super(UnknownArgumentError, self).__init__(message) + + +class ProcessExecutionError(Exception): + def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, + description=None): + if description is None: + description = "Unexpected error while running command." + if exit_code is None: + exit_code = '-' + message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" + % (description, cmd, exit_code, stdout, stderr)) + super(ProcessExecutionError, self).__init__(message) + + +def execute(*cmd, **kwargs): + """ + Helper method to shell out and execute a command through subprocess with + optional retry. + + :param cmd: Passed to subprocess.Popen. + :type cmd: string + :param process_input: Send to opened process. + :type proces_input: string + :param check_exit_code: Defaults to 0. Will raise + :class:`ProcessExecutionError` + if the command exits without returning this value + as a returncode + :type check_exit_code: int + :param delay_on_retry: True | False. Defaults to True. If set to True, + wait a short amount of time before retrying. + :type delay_on_retry: boolean + :param attempts: How many times to retry cmd. + :type attempts: int + :param run_as_root: True | False. Defaults to False. If set to True, + the command is prefixed by the command specified + in the root_helper kwarg. + :type run_as_root: boolean + :param root_helper: command to prefix all cmd's with + :type root_helper: string + :returns: (stdout, stderr) from process execution + :raises: :class:`UnknownArgumentError` on + receiving unknown arguments + :raises: :class:`ProcessExecutionError` + """ + + process_input = kwargs.pop('process_input', None) + check_exit_code = kwargs.pop('check_exit_code', 0) + delay_on_retry = kwargs.pop('delay_on_retry', True) + attempts = kwargs.pop('attempts', 1) + run_as_root = kwargs.pop('run_as_root', False) + root_helper = kwargs.pop('root_helper', '') + if len(kwargs): + raise UnknownArgumentError(_('Got unknown keyword args ' + 'to utils.execute: %r') % kwargs) + if run_as_root: + cmd = shlex.split(root_helper) + list(cmd) + cmd = map(str, cmd) + + while attempts > 0: + attempts -= 1 + try: + LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) + _PIPE = subprocess.PIPE # pylint: disable=E1101 + obj = subprocess.Popen(cmd, + stdin=_PIPE, + stdout=_PIPE, + stderr=_PIPE, + close_fds=True) + result = None + if process_input is not None: + result = obj.communicate(process_input) + else: + result = obj.communicate() + obj.stdin.close() # pylint: disable=E1101 + _returncode = obj.returncode # pylint: disable=E1101 + if _returncode: + LOG.debug(_('Result was %s') % _returncode) + if (isinstance(check_exit_code, int) and + not isinstance(check_exit_code, bool) and + _returncode != check_exit_code): + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) + return result + except ProcessExecutionError: + if not attempts: + raise + else: + LOG.debug(_('%r failed. Retrying.'), cmd) + if delay_on_retry: + greenthread.sleep(random.randint(20, 200) / 100.0) + finally: + # NOTE(termie): this appears to be necessary to let the subprocess + # call clean something up in between calls, without + # it two execute calls in a row hangs the second one + greenthread.sleep(0) diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index 771f79410..02ee48455 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -26,13 +26,13 @@ For some wrappers that add message versioning to rpc, see: """ import inspect -import logging from oslo.config import cfg from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import local +from ceilometer.openstack.common import log as logging LOG = logging.getLogger(__name__) diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index dbd5b96c8..3261f6504 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 2aa1503f9..92e6c00e1 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -276,7 +276,7 @@ def _safe_log(log_func, msg, msg_data): for elem in arg[:-1]: d = d[elem] d[arg[-1]] = '' - except KeyError, e: + except KeyError as e: LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), {'item': arg, 'err': e}) @@ -419,7 +419,7 @@ class ClientException(Exception): def catch_client_exception(exceptions, func, *args, **kwargs): try: return func(*args, **kwargs) - except Exception, e: + except Exception as e: if type(e) in exceptions: raise ClientException() else: diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py index 927af0db8..33734509b 100644 --- a/ceilometer/openstack/common/rpc/dispatcher.py +++ b/ceilometer/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py index 7c3e69caa..64411855f 100644 --- a/ceilometer/openstack/common/rpc/impl_fake.py +++ b/ceilometer/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index 9e2a34bce..1e25795a3 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -176,7 +176,7 @@ class ConsumerBase(object): """Cancel the consuming from the queue, if it has started""" try: self.queue.cancel(self.tag) - except KeyError, e: + except KeyError as e: # NOTE(comstud): Kludge to get around a amqplib bug if str(e) != "u'%s'" % self.tag: raise @@ -520,7 +520,7 @@ class Connection(object): return except (IOError, self.connection_errors) as e: pass - except Exception, e: + except Exception as e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport # connection_errors in the case of a timeout waiting for @@ -561,10 +561,10 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (self.connection_errors, socket.timeout, IOError), e: + except (self.connection_errors, socket.timeout, IOError) as e: if error_callback: error_callback(e) - except Exception, e: + except Exception as e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport # connection_errors in the case of a timeout waiting for diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 7348c1fba..e52c26eb6 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -346,7 +346,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid_exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError as e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 1b2295c2f..dbf91df29 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -276,12 +276,13 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns pass - except rpc_common.ClientException, e: + except rpc_common.ClientException as e: LOG.debug(_("Expected exception during message handling (%s)") % e._exc_info[1]) return {'exc': @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/ceilometer/openstack/common/rpc/proxy.py b/ceilometer/openstack/common/rpc/proxy.py index 98b1c8edf..192d9f6de 100644 --- a/ceilometer/openstack/common/rpc/proxy.py +++ b/ceilometer/openstack/common/rpc/proxy.py @@ -58,9 +58,13 @@ class RpcProxy(object): """Return the topic to use for a message.""" return topic if topic else self.topic + @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. diff --git a/ceilometer/openstack/common/rpc/zmq_receiver.py b/ceilometer/openstack/common/rpc/zmq_receiver.py new file mode 100755 index 000000000..56195b90b --- /dev/null +++ b/ceilometer/openstack/common/rpc/zmq_receiver.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import sys + +from oslo.config import cfg + +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import rpc +from ceilometer.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.setup("oslo") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() diff --git a/ceilometer/openstack/common/uuidutils.py b/ceilometer/openstack/common/uuidutils.py new file mode 100644 index 000000000..7608acb94 --- /dev/null +++ b/ceilometer/openstack/common/uuidutils.py @@ -0,0 +1,39 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Intel Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +UUID related utilities and helper functions. +""" + +import uuid + + +def generate_uuid(): + return str(uuid.uuid4()) + + +def is_uuid_like(val): + """Returns validation of a value as a UUID. + + For our purposes, a UUID is a canonical form string: + aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa + + """ + try: + return str(uuid.UUID(val)) == val + except (TypeError, ValueError, AttributeError): + return False