Update openstack.common

Change-Id: I952bc668ce10d05944eb0d2b06c8eff917c22af8
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-01-08 16:13:06 +01:00
parent 04e54c4f3e
commit 4c5fc2204a
36 changed files with 1808 additions and 937 deletions

View File

@ -49,7 +49,5 @@ class AdminAuthHook(hooks.PecanHook):
def before(self, state): def before(self, state):
headers = state.request.headers headers = state.request.headers
if not policy.check_is_admin(headers.get('X-Roles', "").split(","), if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
headers.get('X-Tenant-Id'),
headers.get('X-Tenant-Name')):
raise exc.HTTPUnauthorized() raise exc.HTTPUnauthorized()

View File

@ -22,7 +22,5 @@ from ceilometer import policy
def get_limited_to_project(headers): def get_limited_to_project(headers):
"""Return the tenant the request should be limited to.""" """Return the tenant the request should be limited to."""
if not policy.check_is_admin(headers.get('X-Roles', "").split(","), if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
headers.get('X-Tenant-Id'),
headers.get('X-Tenant-Name')):
return headers.get('X-Tenant-Id') return headers.get('X-Tenant-Id')

View File

@ -205,27 +205,11 @@ Option values may reference other values using PEP 292 string substitution::
Note that interpolation can be avoided by using '$$'. Note that interpolation can be avoided by using '$$'.
For command line utilities that dispatch to other command line utilities, the
disable_interspersed_args() method is available. If this this method is called,
then parsing e.g.::
script --verbose cmd --debug /tmp/mything
will no longer return::
['cmd', '/tmp/mything']
as the leftover arguments, but will instead return::
['cmd', '--debug', '/tmp/mything']
i.e. argument parsing is stopped at the first non-option argument.
Options may be declared as required so that an error is raised if the user Options may be declared as required so that an error is raised if the user
does not supply a value for the option. does not supply a value for the option.
Options may be declared as secret so that their values are not leaked into Options may be declared as secret so that their values are not leaked into
log files: log files::
opts = [ opts = [
cfg.StrOpt('s3_store_access_key', secret=True), cfg.StrOpt('s3_store_access_key', secret=True),
@ -233,29 +217,53 @@ log files:
... ...
] ]
This module also contains a global instance of the CommonConfigOpts class This module also contains a global instance of the ConfigOpts class
in order to support a common usage pattern in OpenStack: in order to support a common usage pattern in OpenStack::
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
opts = [ opts = [
cfg.StrOpt('bind_host', default='0.0.0.0'), cfg.StrOpt('bind_host', default='0.0.0.0'),
cfg.IntOpt('bind_port', default=9292), cfg.IntOpt('bind_port', default=9292),
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(opts) CONF.register_opts(opts)
def start(server, app): def start(server, app):
server.start(app, CONF.bind_port, CONF.bind_host) server.start(app, CONF.bind_port, CONF.bind_host)
Positional command line arguments are supported via a 'positional' Opt
constructor argument::
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
True
>>> conf(['a', 'b'])
>>> conf.bar
['a', 'b']
It is also possible to use argparse "sub-parsers" to parse additional
command line arguments using the SubCommandOpt class:
>>> def add_parsers(subparsers):
... list_action = subparsers.add_parser('list')
... list_action.add_argument('id')
...
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
True
>>> conf(args=['list', '10'])
>>> conf.action.name, conf.action.id
('list', '10')
""" """
import argparse
import collections import collections
import copy import copy
import functools import functools
import glob import glob
import optparse
import os import os
import string import string
import sys import sys
@ -474,6 +482,13 @@ def _is_opt_registered(opts, opt):
return False return False
def set_defaults(opts, **kwargs):
for opt in opts:
if opt.dest in kwargs:
opt.default = kwargs[opt.dest]
break
class Opt(object): class Opt(object):
"""Base class for all configuration options. """Base class for all configuration options.
@ -489,6 +504,8 @@ class Opt(object):
a single character CLI option name a single character CLI option name
default: default:
the default value of the option the default value of the option
positional:
True if the option is a positional CLI argument
metavar: metavar:
the name shown as the argument to a CLI option in --help output the name shown as the argument to a CLI option in --help output
help: help:
@ -497,8 +514,8 @@ class Opt(object):
multi = False multi = False
def __init__(self, name, dest=None, short=None, default=None, def __init__(self, name, dest=None, short=None, default=None,
metavar=None, help=None, secret=False, required=False, positional=False, metavar=None, help=None,
deprecated_name=None): secret=False, required=False, deprecated_name=None):
"""Construct an Opt object. """Construct an Opt object.
The only required parameter is the option's name. However, it is The only required parameter is the option's name. However, it is
@ -508,6 +525,7 @@ class Opt(object):
:param dest: the name of the corresponding ConfigOpts property :param dest: the name of the corresponding ConfigOpts property
:param short: a single character CLI option name :param short: a single character CLI option name
:param default: the default value of the option :param default: the default value of the option
:param positional: True if the option is a positional CLI argument
:param metavar: the option argument to show in --help :param metavar: the option argument to show in --help
:param help: an explanation of how the option is used :param help: an explanation of how the option is used
:param secret: true iff the value should be obfuscated in log output :param secret: true iff the value should be obfuscated in log output
@ -521,6 +539,7 @@ class Opt(object):
self.dest = dest self.dest = dest
self.short = short self.short = short
self.default = default self.default = default
self.positional = positional
self.metavar = metavar self.metavar = metavar
self.help = help self.help = help
self.secret = secret self.secret = secret
@ -561,64 +580,73 @@ class Opt(object):
:param parser: the CLI option parser :param parser: the CLI option parser
:param group: an optional OptGroup object :param group: an optional OptGroup object
""" """
container = self._get_optparse_container(parser, group) container = self._get_argparse_container(parser, group)
kwargs = self._get_optparse_kwargs(group) kwargs = self._get_argparse_kwargs(group)
prefix = self._get_optparse_prefix('', group) prefix = self._get_argparse_prefix('', group)
self._add_to_optparse(container, self.name, self.short, kwargs, prefix, self._add_to_argparse(container, self.name, self.short, kwargs, prefix,
self.deprecated_name) self.positional, self.deprecated_name)
def _add_to_optparse(self, container, name, short, kwargs, prefix='', def _add_to_argparse(self, container, name, short, kwargs, prefix='',
deprecated_name=None): positional=False, deprecated_name=None):
"""Add an option to an optparse parser or group. """Add an option to an argparse parser or group.
:param container: an optparse.OptionContainer object :param container: an argparse._ArgumentGroup object
:param name: the opt name :param name: the opt name
:param short: the short opt name :param short: the short opt name
:param kwargs: the keyword arguments for add_option() :param kwargs: the keyword arguments for add_argument()
:param prefix: an optional prefix to prepend to the opt name :param prefix: an optional prefix to prepend to the opt name
:param position: whether the optional is a positional CLI argument
:raises: DuplicateOptError if a naming confict is detected :raises: DuplicateOptError if a naming confict is detected
""" """
args = ['--' + prefix + name] def hyphen(arg):
return arg if not positional else ''
args = [hyphen('--') + prefix + name]
if short: if short:
args += ['-' + short] args.append(hyphen('-') + short)
if deprecated_name: if deprecated_name:
args += ['--' + prefix + deprecated_name] args.append(hyphen('--') + prefix + deprecated_name)
for a in args:
if container.has_option(a):
raise DuplicateOptError(a)
container.add_option(*args, **kwargs)
def _get_optparse_container(self, parser, group): try:
"""Returns an optparse.OptionContainer. container.add_argument(*args, **kwargs)
except argparse.ArgumentError as e:
raise DuplicateOptError(e)
:param parser: an optparse.OptionParser def _get_argparse_container(self, parser, group):
"""Returns an argparse._ArgumentGroup.
:param parser: an argparse.ArgumentParser
:param group: an (optional) OptGroup object :param group: an (optional) OptGroup object
:returns: an optparse.OptionGroup if a group is given, else the parser :returns: an argparse._ArgumentGroup if group is given, else parser
""" """
if group is not None: if group is not None:
return group._get_optparse_group(parser) return group._get_argparse_group(parser)
else: else:
return parser return parser
def _get_optparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
"""Build a dict of keyword arguments for optparse's add_option(). """Build a dict of keyword arguments for argparse's add_argument().
Most opt types extend this method to customize the behaviour of the Most opt types extend this method to customize the behaviour of the
options added to optparse. options added to argparse.
:param group: an optional group :param group: an optional group
:param kwargs: optional keyword arguments to add to :param kwargs: optional keyword arguments to add to
:returns: a dict of keyword arguments :returns: a dict of keyword arguments
""" """
dest = self.dest if not self.positional:
if group is not None: dest = self.dest
dest = group.name + '_' + dest if group is not None:
kwargs.update({'dest': dest, dest = group.name + '_' + dest
kwargs['dest'] = dest
else:
kwargs['nargs'] = '?'
kwargs.update({'default': None,
'metavar': self.metavar, 'metavar': self.metavar,
'help': self.help, }) 'help': self.help, })
return kwargs return kwargs
def _get_optparse_prefix(self, prefix, group): def _get_argparse_prefix(self, prefix, group):
"""Build a prefix for the CLI option name, if required. """Build a prefix for the CLI option name, if required.
CLI options in a group are prefixed with the group's name in order CLI options in a group are prefixed with the group's name in order
@ -656,6 +684,11 @@ class BoolOpt(Opt):
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True, _boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False} '0': False, 'no': False, 'false': False, 'off': False}
def __init__(self, *args, **kwargs):
if 'positional' in kwargs:
raise ValueError('positional boolean args not supported')
super(BoolOpt, self).__init__(*args, **kwargs)
def _get_from_config_parser(self, cparser, section): def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a boolean from ConfigParser.""" """Retrieve the opt value as a boolean from ConfigParser."""
def convert_bool(v): def convert_bool(v):
@ -671,21 +704,32 @@ class BoolOpt(Opt):
def _add_to_cli(self, parser, group=None): def _add_to_cli(self, parser, group=None):
"""Extends the base class method to add the --nooptname option.""" """Extends the base class method to add the --nooptname option."""
super(BoolOpt, self)._add_to_cli(parser, group) super(BoolOpt, self)._add_to_cli(parser, group)
self._add_inverse_to_optparse(parser, group) self._add_inverse_to_argparse(parser, group)
def _add_inverse_to_optparse(self, parser, group): def _add_inverse_to_argparse(self, parser, group):
"""Add the --nooptname option to the option parser.""" """Add the --nooptname option to the option parser."""
container = self._get_optparse_container(parser, group) container = self._get_argparse_container(parser, group)
kwargs = self._get_optparse_kwargs(group, action='store_false') kwargs = self._get_argparse_kwargs(group, action='store_false')
prefix = self._get_optparse_prefix('no', group) prefix = self._get_argparse_prefix('no', group)
kwargs["help"] = "The inverse of --" + self.name kwargs["help"] = "The inverse of --" + self.name
self._add_to_optparse(container, self.name, None, kwargs, prefix, self._add_to_argparse(container, self.name, None, kwargs, prefix,
self.deprecated_name) self.positional, self.deprecated_name)
def _get_optparse_kwargs(self, group, action='store_true', **kwargs): def _get_argparse_kwargs(self, group, action='store_true', **kwargs):
"""Extends the base optparse keyword dict for boolean options.""" """Extends the base argparse keyword dict for boolean options."""
return super(BoolOpt,
self)._get_optparse_kwargs(group, action=action, **kwargs) kwargs = super(BoolOpt, self)._get_argparse_kwargs(group, **kwargs)
# metavar has no effect for BoolOpt
if 'metavar' in kwargs:
del kwargs['metavar']
if action != 'store_true':
action = 'store_false'
kwargs['action'] = action
return kwargs
class IntOpt(Opt): class IntOpt(Opt):
@ -697,10 +741,10 @@ class IntOpt(Opt):
return [int(v) for v in self._cparser_get_with_deprecated(cparser, return [int(v) for v in self._cparser_get_with_deprecated(cparser,
section)] section)]
def _get_optparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for integer options.""" """Extends the base argparse keyword dict for integer options."""
return super(IntOpt, return super(IntOpt,
self)._get_optparse_kwargs(group, type='int', **kwargs) self)._get_argparse_kwargs(group, type=int, **kwargs)
class FloatOpt(Opt): class FloatOpt(Opt):
@ -712,10 +756,10 @@ class FloatOpt(Opt):
return [float(v) for v in return [float(v) for v in
self._cparser_get_with_deprecated(cparser, section)] self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for float options.""" """Extends the base argparse keyword dict for float options."""
return super(FloatOpt, return super(FloatOpt, self)._get_argparse_kwargs(group,
self)._get_optparse_kwargs(group, type='float', **kwargs) type=float, **kwargs)
class ListOpt(Opt): class ListOpt(Opt):
@ -725,23 +769,26 @@ class ListOpt(Opt):
is a list containing these strings. is a list containing these strings.
""" """
class _StoreListAction(argparse.Action):
"""
An argparse action for parsing an option value into a list.
"""
def __call__(self, parser, namespace, values, option_string=None):
if values is not None:
values = [a.strip() for a in values.split(',')]
setattr(namespace, self.dest, values)
def _get_from_config_parser(self, cparser, section): def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser.""" """Retrieve the opt value as a list from ConfigParser."""
return [v.split(',') for v in return [[a.strip() for a in v.split(',')] for v in
self._cparser_get_with_deprecated(cparser, section)] self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for list options.""" """Extends the base argparse keyword dict for list options."""
return super(ListOpt, return Opt._get_argparse_kwargs(self,
self)._get_optparse_kwargs(group, group,
type='string', action=ListOpt._StoreListAction,
action='callback', **kwargs)
callback=self._parse_list,
**kwargs)
def _parse_list(self, option, opt, value, parser):
"""An optparse callback for parsing an option value into a list."""
setattr(parser.values, self.dest, value.split(','))
class MultiStrOpt(Opt): class MultiStrOpt(Opt):
@ -752,10 +799,14 @@ class MultiStrOpt(Opt):
""" """
multi = True multi = True
def _get_optparse_kwargs(self, group, **kwargs): def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for multi str options.""" """Extends the base argparse keyword dict for multi str options."""
return super(MultiStrOpt, kwargs = super(MultiStrOpt, self)._get_argparse_kwargs(group)
self)._get_optparse_kwargs(group, action='append') if not self.positional:
kwargs['action'] = 'append'
else:
kwargs['nargs'] = '*'
return kwargs
def _cparser_get_with_deprecated(self, cparser, section): def _cparser_get_with_deprecated(self, cparser, section):
"""If cannot find option as dest try deprecated_name alias.""" """If cannot find option as dest try deprecated_name alias."""
@ -765,6 +816,57 @@ class MultiStrOpt(Opt):
return cparser.get(section, [self.dest], multi=True) return cparser.get(section, [self.dest], multi=True)
class SubCommandOpt(Opt):
"""
Sub-command options allow argparse sub-parsers to be used to parse
additional command line arguments.
The handler argument to the SubCommandOpt contructor is a callable
which is supplied an argparse subparsers object. Use this handler
callable to add sub-parsers.
The opt value is SubCommandAttr object with the name of the chosen
sub-parser stored in the 'name' attribute and the values of other
sub-parser arguments available as additional attributes.
"""
def __init__(self, name, dest=None, handler=None,
title=None, description=None, help=None):
"""Construct an sub-command parsing option.
This behaves similarly to other Opt sub-classes but adds a
'handler' argument. The handler is a callable which is supplied
an subparsers object when invoked. The add_parser() method on
this subparsers object can be used to register parsers for
sub-commands.
:param name: the option's name
:param dest: the name of the corresponding ConfigOpts property
:param title: title of the sub-commands group in help output
:param description: description of the group in help output
:param help: a help string giving an overview of available sub-commands
"""
super(SubCommandOpt, self).__init__(name, dest=dest, help=help)
self.handler = handler
self.title = title
self.description = description
def _add_to_cli(self, parser, group=None):
"""Add argparse sub-parsers and invoke the handler method."""
dest = self.dest
if group is not None:
dest = group.name + '_' + dest
subparsers = parser.add_subparsers(dest=dest,
title=self.title,
description=self.description,
help=self.help)
if not self.handler is None:
self.handler(subparsers)
class OptGroup(object): class OptGroup(object):
""" """
@ -800,19 +902,20 @@ class OptGroup(object):
self.help = help self.help = help
self._opts = {} # dict of dicts of (opt:, override:, default:) self._opts = {} # dict of dicts of (opt:, override:, default:)
self._optparse_group = None self._argparse_group = None
def _register_opt(self, opt): def _register_opt(self, opt, cli=False):
"""Add an opt to this group. """Add an opt to this group.
:param opt: an Opt object :param opt: an Opt object
:param cli: whether this is a CLI option
:returns: False if previously registered, True otherwise :returns: False if previously registered, True otherwise
:raises: DuplicateOptError if a naming conflict is detected :raises: DuplicateOptError if a naming conflict is detected
""" """
if _is_opt_registered(self._opts, opt): if _is_opt_registered(self._opts, opt):
return False return False
self._opts[opt.dest] = {'opt': opt} self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True return True
@ -824,16 +927,16 @@ class OptGroup(object):
if opt.dest in self._opts: if opt.dest in self._opts:
del self._opts[opt.dest] del self._opts[opt.dest]
def _get_optparse_group(self, parser): def _get_argparse_group(self, parser):
"""Build an optparse.OptionGroup for this group.""" if self._argparse_group is None:
if self._optparse_group is None: """Build an argparse._ArgumentGroup for this group."""
self._optparse_group = optparse.OptionGroup(parser, self.title, self._argparse_group = parser.add_argument_group(self.title,
self.help) self.help)
return self._optparse_group return self._argparse_group
def _clear(self): def _clear(self):
"""Clear this group's option parsing state.""" """Clear this group's option parsing state."""
self._optparse_group = None self._argparse_group = None
class ParseError(iniparser.ParseError): class ParseError(iniparser.ParseError):
@ -928,26 +1031,31 @@ class ConfigOpts(collections.Mapping):
self._groups = {} self._groups = {}
self._args = None self._args = None
self._oparser = None self._oparser = None
self._cparser = None self._cparser = None
self._cli_values = {} self._cli_values = {}
self.__cache = {} self.__cache = {}
self._config_opts = [] self._config_opts = []
self._disable_interspersed_args = False
def _setup(self, project, prog, version, usage, default_config_files): def _pre_setup(self, project, prog, version, usage, default_config_files):
"""Initialize a ConfigOpts object for option parsing.""" """Initialize a ConfigCliParser object for option parsing."""
if prog is None: if prog is None:
prog = os.path.basename(sys.argv[0]) prog = os.path.basename(sys.argv[0])
if default_config_files is None: if default_config_files is None:
default_config_files = find_config_files(project, prog) default_config_files = find_config_files(project, prog)
self._oparser = optparse.OptionParser(prog=prog, self._oparser = argparse.ArgumentParser(prog=prog, usage=usage)
version=version, self._oparser.add_argument('--version',
usage=usage) action='version',
if self._disable_interspersed_args: version=version)
self._oparser.disable_interspersed_args()
return prog, default_config_files
def _setup(self, project, prog, version, usage, default_config_files):
"""Initialize a ConfigOpts object for option parsing."""
self._config_opts = [ self._config_opts = [
MultiStrOpt('config-file', MultiStrOpt('config-file',
@ -1017,18 +1125,23 @@ class ConfigOpts(collections.Mapping):
:raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError, :raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError,
RequiredOptError, DuplicateOptError RequiredOptError, DuplicateOptError
""" """
self.clear() self.clear()
prog, default_config_files = self._pre_setup(project,
prog,
version,
usage,
default_config_files)
self._setup(project, prog, version, usage, default_config_files) self._setup(project, prog, version, usage, default_config_files)
self._cli_values, leftovers = self._parse_cli_opts(args) self._cli_values = self._parse_cli_opts(args)
self._parse_config_files() self._parse_config_files()
self._check_required_opts() self._check_required_opts()
return leftovers
def __getattr__(self, name): def __getattr__(self, name):
"""Look up an option value and perform string substitution. """Look up an option value and perform string substitution.
@ -1062,17 +1175,21 @@ class ConfigOpts(collections.Mapping):
@__clear_cache @__clear_cache
def clear(self): def clear(self):
"""Clear the state of the object to before it was called.""" """Clear the state of the object to before it was called.
Any subparsers added using the add_cli_subparsers() will also be
removed as a side-effect of this method.
"""
self._args = None self._args = None
self._cli_values.clear() self._cli_values.clear()
self._oparser = None self._oparser = argparse.ArgumentParser()
self._cparser = None self._cparser = None
self.unregister_opts(self._config_opts) self.unregister_opts(self._config_opts)
for group in self._groups.values(): for group in self._groups.values():
group._clear() group._clear()
@__clear_cache @__clear_cache
def register_opt(self, opt, group=None): def register_opt(self, opt, group=None, cli=False):
"""Register an option schema. """Register an option schema.
Registering an option schema makes any option value which is previously Registering an option schema makes any option value which is previously
@ -1080,17 +1197,19 @@ class ConfigOpts(collections.Mapping):
as an attribute of this object. as an attribute of this object.
:param opt: an instance of an Opt sub-class :param opt: an instance of an Opt sub-class
:param cli: whether this is a CLI option
:param group: an optional OptGroup object or group name :param group: an optional OptGroup object or group name
:return: False if the opt was already register, True otherwise :return: False if the opt was already register, True otherwise
:raises: DuplicateOptError :raises: DuplicateOptError
""" """
if group is not None: if group is not None:
return self._get_group(group, autocreate=True)._register_opt(opt) group = self._get_group(group, autocreate=True)
return group._register_opt(opt, cli)
if _is_opt_registered(self._opts, opt): if _is_opt_registered(self._opts, opt):
return False return False
self._opts[opt.dest] = {'opt': opt} self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True return True
@ -1116,7 +1235,7 @@ class ConfigOpts(collections.Mapping):
if self._args is not None: if self._args is not None:
raise ArgsAlreadyParsedError("cannot register CLI option") raise ArgsAlreadyParsedError("cannot register CLI option")
return self.register_opt(opt, group, clear_cache=False) return self.register_opt(opt, group, cli=True, clear_cache=False)
@__clear_cache @__clear_cache
def register_cli_opts(self, opts, group=None): def register_cli_opts(self, opts, group=None):
@ -1243,10 +1362,11 @@ class ConfigOpts(collections.Mapping):
for info in group._opts.values(): for info in group._opts.values():
yield info, group yield info, group
def _all_opts(self): def _all_cli_opts(self):
"""A generator function for iteration opts.""" """A generator function for iterating CLI opts."""
for info, group in self._all_opt_infos(): for info, group in self._all_opt_infos():
yield info['opt'], group if info['cli']:
yield info['opt'], group
def _unset_defaults_and_overrides(self): def _unset_defaults_and_overrides(self):
"""Unset any default or override on all options.""" """Unset any default or override on all options."""
@ -1254,31 +1374,6 @@ class ConfigOpts(collections.Mapping):
info.pop('default', None) info.pop('default', None)
info.pop('override', None) info.pop('override', None)
def disable_interspersed_args(self):
"""Set parsing to stop on the first non-option.
If this this method is called, then parsing e.g.
script --verbose cmd --debug /tmp/mything
will no longer return:
['cmd', '/tmp/mything']
as the leftover arguments, but will instead return:
['cmd', '--debug', '/tmp/mything']
i.e. argument parsing is stopped at the first non-option argument.
"""
self._disable_interspersed_args = True
def enable_interspersed_args(self):
"""Set parsing to not stop on the first non-option.
This it the default behaviour."""
self._disable_interspersed_args = False
def find_file(self, name): def find_file(self, name):
"""Locate a file located alongside the config files. """Locate a file located alongside the config files.
@ -1377,6 +1472,9 @@ class ConfigOpts(collections.Mapping):
info = self._get_opt_info(name, group) info = self._get_opt_info(name, group)
opt = info['opt'] opt = info['opt']
if isinstance(opt, SubCommandOpt):
return self.SubCommandAttr(self, group, opt.dest)
if 'override' in info: if 'override' in info:
return info['override'] return info['override']
@ -1401,6 +1499,10 @@ class ConfigOpts(collections.Mapping):
if not opt.multi: if not opt.multi:
return value return value
# argparse ignores default=None for nargs='*'
if opt.positional and not value:
value = opt.default
return value + values return value + values
if values: if values:
@ -1523,12 +1625,10 @@ class ConfigOpts(collections.Mapping):
""" """
self._args = args self._args = args
for opt, group in self._all_opts(): for opt, group in self._all_cli_opts():
opt._add_to_cli(self._oparser, group) opt._add_to_cli(self._oparser, group)
values, leftovers = self._oparser.parse_args(args) return vars(self._oparser.parse_args(args))
return vars(values), leftovers
class GroupAttr(collections.Mapping): class GroupAttr(collections.Mapping):
@ -1543,12 +1643,12 @@ class ConfigOpts(collections.Mapping):
:param conf: a ConfigOpts object :param conf: a ConfigOpts object
:param group: an OptGroup object :param group: an OptGroup object
""" """
self.conf = conf self._conf = conf
self.group = group self._group = group
def __getattr__(self, name): def __getattr__(self, name):
"""Look up an option value and perform template substitution.""" """Look up an option value and perform template substitution."""
return self.conf._get(name, self.group) return self._conf._get(name, self._group)
def __getitem__(self, key): def __getitem__(self, key):
"""Look up an option value and perform string substitution.""" """Look up an option value and perform string substitution."""
@ -1556,16 +1656,50 @@ class ConfigOpts(collections.Mapping):
def __contains__(self, key): def __contains__(self, key):
"""Return True if key is the name of a registered opt or group.""" """Return True if key is the name of a registered opt or group."""
return key in self.group._opts return key in self._group._opts
def __iter__(self): def __iter__(self):
"""Iterate over all registered opt and group names.""" """Iterate over all registered opt and group names."""
for key in self.group._opts.keys(): for key in self._group._opts.keys():
yield key yield key
def __len__(self): def __len__(self):
"""Return the number of options and option groups.""" """Return the number of options and option groups."""
return len(self.group._opts) return len(self._group._opts)
class SubCommandAttr(object):
"""
A helper class representing the name and arguments of an argparse
sub-parser.
"""
def __init__(self, conf, group, dest):
"""Construct a SubCommandAttr object.
:param conf: a ConfigOpts object
:param group: an OptGroup object
:param dest: the name of the sub-parser
"""
self._conf = conf
self._group = group
self._dest = dest
def __getattr__(self, name):
"""Look up a sub-parser name or argument value."""
if name == 'name':
name = self._dest
if self._group is not None:
name = self._group.name + '_' + name
return self._conf._cli_values[name]
if name in self._conf:
raise DuplicateOptError(name)
try:
return self._conf._cli_values[name]
except KeyError:
raise NoSuchOptError(name)
class StrSubWrapper(object): class StrSubWrapper(object):
@ -1594,60 +1728,4 @@ class ConfigOpts(collections.Mapping):
return value return value
class CommonConfigOpts(ConfigOpts): CONF = ConfigOpts()
DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
BoolOpt('debug',
short='d',
default=False,
help='Print debugging output'),
BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output'),
]
logging_cli_opts = [
StrOpt('log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
StrOpt('log-format',
default=DEFAULT_LOG_FORMAT,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %default'),
StrOpt('log-date-format',
default=DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %(asctime)s in log records. '
'Default: %default'),
StrOpt('log-file',
metavar='PATH',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
StrOpt('log-dir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --logfile)'),
BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
]
def __init__(self):
super(CommonConfigOpts, self).__init__()
self.register_cli_opts(self.common_cli_opts)
self.register_cli_opts(self.logging_cli_opts)
CONF = CommonConfigOpts()

View File

@ -46,7 +46,7 @@ def _find_objects(t):
def _print_greenthreads(): def _print_greenthreads():
for i, gt in enumerate(find_objects(greenlet.greenlet)): for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt print i, gt
traceback.print_stack(gt.gr_frame) traceback.print_stack(gt.gr_frame)
print print

View File

@ -24,6 +24,8 @@ import logging
import sys import sys
import traceback import traceback
from ceilometer.openstack.common.gettextutils import _
@contextlib.contextmanager @contextlib.contextmanager
def save_and_reraise_exception(): def save_and_reraise_exception():
@ -43,7 +45,7 @@ def save_and_reraise_exception():
try: try:
yield yield
except Exception: except Exception:
logging.error('Original exception being dropped: %s' % logging.error(_('Original exception being dropped: %s'),
(traceback.format_exception(type_, value, tb))) traceback.format_exception(type_, value, tb))
raise raise
raise type_, value, tb raise type_, value, tb

View File

@ -29,7 +29,7 @@ def import_class(import_str):
try: try:
__import__(mod_str) __import__(mod_str)
return getattr(sys.modules[mod_str], class_str) return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError), exc: except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' % raise ImportError('Class %s cannot be found (%s)' %
(class_str, (class_str,
traceback.format_exception(*sys.exc_info()))) traceback.format_exception(*sys.exc_info())))
@ -57,3 +57,11 @@ def import_module(import_str):
"""Import a module.""" """Import a module."""
__import__(import_str) __import__(import_str)
return sys.modules[import_str] return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level + 1) level=level + 1)
else: else:
return value return value
except TypeError, e: except TypeError:
# Class objects are tricky since they may define something like # Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list(). # __iter__ defined but it isn't callable as list().
return unicode(value) return unicode(value)

View File

@ -47,21 +47,83 @@ from ceilometer.openstack.common import local
from ceilometer.openstack.common import notifier from ceilometer.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
default=False,
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
cfg.BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
]
logging_cli_opts = [
cfg.StrOpt('log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'),
cfg.StrOpt('log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --log-file)'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
cfg.StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
]
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
]
log_opts = [ log_opts = [
cfg.StrOpt('logging_context_format_string', cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'%(user)s %(tenant)s] %(instance)s' '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s', '%(message)s',
help='format string to use for log messages with context'), help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string', cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
' %(instance)s%(message)s', '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'), help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix', cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d', default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'), help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix', cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'), help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels', cfg.ListOpt('default_log_levels',
default=[ default=[
@ -93,24 +155,9 @@ log_opts = [
'format it like this'), 'format it like this'),
] ]
generic_log_opts = [
cfg.StrOpt('logdir',
default=None,
help='Log output to a per-service log file in named directory'),
cfg.StrOpt('logfile',
default=None,
help='Log output to a named file'),
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_cli_opts(common_cli_opts)
CONF.register_cli_opts(logging_cli_opts)
CONF.register_opts(generic_log_opts) CONF.register_opts(generic_log_opts)
CONF.register_opts(log_opts) CONF.register_opts(log_opts)
@ -148,8 +195,8 @@ def _get_binary_name():
def _get_log_file_path(binary=None): def _get_log_file_path(binary=None):
logfile = CONF.log_file or CONF.logfile logfile = CONF.log_file
logdir = CONF.log_dir or CONF.logdir logdir = CONF.log_dir
if logfile and not logdir: if logfile and not logdir:
return logfile return logfile
@ -174,7 +221,7 @@ class ContextAdapter(logging.LoggerAdapter):
self.log(logging.AUDIT, msg, *args, **kwargs) self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs): def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated Config: %s") % msg stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations: if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs) self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg) raise DeprecatedConfig(msg=stdmsg)
@ -289,6 +336,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name) _setup_logging_from_conf(product_name)
def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts,
logging_context_format_string=
logging_context_format_string)
def _find_facility_from_conf(): def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler, facility = getattr(logging.handlers.SysLogHandler,
@ -354,10 +407,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt)) datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug: if CONF.debug:
log_root.setLevel(logging.DEBUG) log_root.setLevel(logging.DEBUG)
else: elif CONF.verbose:
log_root.setLevel(logging.INFO) log_root.setLevel(logging.INFO)
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET level = logging.NOTSET
for pair in CONF.default_log_levels: for pair in CONF.default_log_levels:

View File

@ -24,6 +24,7 @@ from eventlet import greenthread
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -62,10 +63,16 @@ class LoopingCall(object):
try: try:
while self._running: while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw) self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running: if not self._running:
break break
greenthread.sleep(interval) delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e: except LoopingCallDone, e:
self.stop() self.stop()
done.send(e.retvalue) done.send(e.retvalue)

View File

@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers(): for driver in _get_drivers():
try: try:
driver.notify(context, msg) driver.notify(context, msg)
except Exception, e: except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to " LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. " "send to notification system. "
"Payload=%(payload)s") % locals()) "Payload=%(payload)s")
% dict(e=e, payload=payload))
_drivers = None _drivers = None
@ -166,7 +167,7 @@ def add_driver(notification_driver):
try: try:
driver = importutils.import_module(notification_driver) driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver _drivers[notification_driver] = driver
except ImportError as e: except ImportError:
LOG.exception(_("Failed to load notifier %s. " LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") % "These notifications will not be sent.") %
notification_driver) notification_driver)

View File

@ -41,6 +41,6 @@ def notify(context, message):
topic = '%s.%s' % (topic, priority) topic = '%s.%s' % (topic, priority)
try: try:
rpc.notify(context, topic, message) rpc.notify(context, topic, message)
except Exception, e: except Exception:
LOG.exception(_("Could not send notification to %(topic)s. " LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals()) "Payload=%(message)s"), locals())

View File

@ -0,0 +1,51 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''messaging based notification driver, with message envelopes'''
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import context as req_context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')
CONF = cfg.CONF
CONF.register_group(opt_group)
CONF.register_opt(notification_topic_opt, opt_group)
def notify(context, message):
"""Sends a notification via RPC"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
for topic in CONF.rpc_notifier2.topics:
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC. # Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,10 +15,52 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Common Policy Engine Implementation""" """
Common Policy Engine Implementation
Policies can be expressed in one of two forms: A list of lists, or a
string written in the new policy language.
In the list-of-lists representation, each check inside the innermost
list is combined as with an "and" conjunction--for that check to pass,
all the specified checks must pass. These innermost lists are then
combined as with an "or" conjunction. This is the original way of
expressing policies, but there now exists a new way: the policy
language.
In the policy language, each check is specified the same way as in the
list-of-lists representation: a simple "a:b" pair that is matched to
the correct code to perform that check. However, conjunction
operators are available, allowing for more expressiveness in crafting
policies.
As an example, take the following rule, expressed in the list-of-lists
representation::
[["role:admin"], ["project_id:%(project_id)s", "role:projectadmin"]]
In the policy language, this becomes::
role:admin or (project_id:%(project_id)s and role:projectadmin)
The policy language also has the "not" operator, allowing a richer
policy rule::
project_id:%(project_id)s and not role:dunce
Finally, two special policy checks should be mentioned; the policy
check "@" will always accept an access, and the policy check "!" will
always reject an access. (Note that if a rule is either the empty
list ("[]") or the empty string, this is equivalent to the "@" policy
check.) Of these, the "!" policy check is probably the most useful,
as it allows particular rules to be explicitly disabled.
"""
import abc
import logging import logging
import re
import urllib import urllib
import urllib2 import urllib2
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
@ -28,218 +70,650 @@ from ceilometer.openstack.common import jsonutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_BRAIN = None _rules = None
_checks = {}
def set_brain(brain): class Rules(dict):
"""Set the brain used by enforce().
Defaults use Brain() if not set.
""" """
global _BRAIN A store for rules. Handles the default_rule setting directly.
_BRAIN = brain
def reset():
"""Clear the brain used by enforce()."""
global _BRAIN
_BRAIN = None
def enforce(match_list, target_dict, credentials_dict, exc=None,
*args, **kwargs):
"""Enforces authorization of some rules against credentials.
:param match_list: nested tuples of data to match against
The basic brain supports three types of match lists:
1) rules
looks like: ``('rule:compute:get_instance',)``
Retrieves the named rule from the rules dict and recursively
checks against the contents of the rule.
2) roles
looks like: ``('role:compute:admin',)``
Matches if the specified role is in credentials_dict['roles'].
3) generic
looks like: ``('tenant_id:%(tenant_id)s',)``
Substitutes values from the target dict into the match using
the % operator and matches them against the creds dict.
Combining rules:
The brain returns True if any of the outer tuple of rules
match and also True if all of the inner tuples match. You
can use this to perform simple boolean logic. For
example, the following rule would return True if the creds
contain the role 'admin' OR the if the tenant_id matches
the target dict AND the the creds contains the role
'compute_sysadmin':
::
{
"rule:combined": (
'role:admin',
('tenant_id:%(tenant_id)s', 'role:compute_sysadmin')
)
}
Note that rule and role are reserved words in the credentials match, so
you can't match against properties with those names. Custom brains may
also add new reserved words. For example, the HttpBrain adds http as a
reserved word.
:param target_dict: dict of object properties
Target dicts contain as much information as we can about the object being
operated on.
:param credentials_dict: dict of actor properties
Credentials dicts contain as much information as we can about the user
performing the action.
:param exc: exception to raise
Class of the exception to raise if the check fails. Any remaining
arguments passed to enforce() (both positional and keyword arguments)
will be passed to the exception class. If exc is not provided, returns
False.
:return: True if the policy allows the action
:return: False if the policy does not allow the action and exc is not set
""" """
global _BRAIN
if not _BRAIN:
_BRAIN = Brain()
if not _BRAIN.check(match_list, target_dict, credentials_dict):
if exc:
raise exc(*args, **kwargs)
return False
return True
class Brain(object):
"""Implements policy checking."""
_checks = {}
@classmethod
def _register(cls, name, func):
cls._checks[name] = func
@classmethod @classmethod
def load_json(cls, data, default_rule=None): def load_json(cls, data, default_rule=None):
"""Init a brain using json instead of a rules dictionary.""" """
rules_dict = jsonutils.loads(data) Allow loading of JSON rule data.
return cls(rules=rules_dict, default_rule=default_rule) """
# Suck in the JSON data and parse the rules
rules = dict((k, parse_rule(v)) for k, v in
jsonutils.loads(data).items())
return cls(rules, default_rule)
def __init__(self, rules=None, default_rule=None): def __init__(self, rules=None, default_rule=None):
if self.__class__ != Brain: """Initialize the Rules store."""
LOG.warning(_("Inheritance-based rules are deprecated; use "
"the default brain instead of %s.") %
self.__class__.__name__)
self.rules = rules or {} super(Rules, self).__init__(rules or {})
self.default_rule = default_rule self.default_rule = default_rule
def add_rule(self, key, match): def __missing__(self, key):
self.rules[key] = match """Implements the default rule handling."""
def _check(self, match, target_dict, cred_dict): # If the default rule isn't actually defined, do something
# reasonably intelligent
if not self.default_rule or self.default_rule not in self:
raise KeyError(key)
return self[self.default_rule]
def __str__(self):
"""Dumps a string representation of the rules."""
# Start by building the canonical strings for the rules
out_rules = {}
for key, value in self.items():
# Use empty string for singleton TrueCheck instances
if isinstance(value, TrueCheck):
out_rules[key] = ''
else:
out_rules[key] = str(value)
# Dump a pretty-printed JSON representation
return jsonutils.dumps(out_rules, indent=4)
# Really have to figure out a way to deprecate this
def set_rules(rules):
"""Set the rules in use for policy checks."""
global _rules
_rules = rules
# Ditto
def reset():
"""Clear the rules used for policy checks."""
global _rules
_rules = None
def check(rule, target, creds, exc=None, *args, **kwargs):
"""
Checks authorization of a rule against the target and credentials.
:param rule: The rule to evaluate.
:param target: As much information about the object being operated
on as possible, as a dictionary.
:param creds: As much information about the user performing the
action as possible, as a dictionary.
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to check() (both
positional and keyword arguments) will be passed to
the exception class. If exc is not provided, returns
False.
:return: Returns False if the policy does not allow the action and
exc is not provided; otherwise, returns a value that
evaluates to True. Note: for rules using the "case"
expression, this True value will be the specified string
from the expression.
"""
# Allow the rule to be a Check tree
if isinstance(rule, BaseCheck):
result = rule(target, creds)
elif not _rules:
# No rules to reference means we're going to fail closed
result = False
else:
try: try:
match_kind, match_value = match.split(':', 1) # Evaluate the rule
except Exception: result = _rules[rule](target, creds)
LOG.exception(_("Failed to understand rule %(match)r") % locals()) except KeyError:
# If the rule is invalid, fail closed # If the rule doesn't exist, fail closed
return False result = False
func = None # If it is False, raise the exception if requested
try: if exc and result is False:
old_func = getattr(self, '_check_%s' % match_kind) raise exc(*args, **kwargs)
except AttributeError:
func = self._checks.get(match_kind, self._checks.get(None, None))
else:
LOG.warning(_("Inheritance-based rules are deprecated; update "
"_check_%s") % match_kind)
func = lambda brain, kind, value, target, cred: old_func(value,
target,
cred)
if not func: return result
LOG.error(_("No handler for matches of kind %s") % match_kind)
# Fail closed
return False
return func(self, match_kind, match_value, target_dict, cred_dict)
def check(self, match_list, target_dict, cred_dict): class BaseCheck(object):
"""Checks authorization of some rules against credentials. """
Abstract base class for Check classes.
"""
Detailed description of the check with examples in policy.enforce(). __metaclass__ = abc.ABCMeta
:param match_list: nested tuples of data to match against
:param target_dict: dict of object properties
:param credentials_dict: dict of actor properties
:returns: True if the check passes
@abc.abstractmethod
def __str__(self):
""" """
if not match_list: Retrieve a string representation of the Check tree rooted at
return True this node.
for and_list in match_list: """
if isinstance(and_list, basestring):
and_list = (and_list,) pass
if all([self._check(item, target_dict, cred_dict)
for item in and_list]): @abc.abstractmethod
return True def __call__(self, target, cred):
"""
Perform the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
pass
class FalseCheck(BaseCheck):
"""
A policy check that always returns False (disallow).
"""
def __str__(self):
"""Return a string representation of this check."""
return "!"
def __call__(self, target, cred):
"""Check the policy."""
return False return False
class HttpBrain(Brain): class TrueCheck(BaseCheck):
"""A brain that can check external urls for policy. """
A policy check that always returns True (allow).
Posts json blobs for target and credentials.
Note that this brain is deprecated; the http check is registered
by default.
""" """
pass def __str__(self):
"""Return a string representation of this check."""
return "@"
def __call__(self, target, cred):
"""Check the policy."""
return True
class Check(BaseCheck):
"""
A base class to allow for user-defined policy checks.
"""
def __init__(self, kind, match):
"""
:param kind: The kind of the check, i.e., the field before the
':'.
:param match: The match of the check, i.e., the field after
the ':'.
"""
self.kind = kind
self.match = match
def __str__(self):
"""Return a string representation of this check."""
return "%s:%s" % (self.kind, self.match)
class NotCheck(BaseCheck):
"""
A policy check that inverts the result of another policy check.
Implements the "not" operator.
"""
def __init__(self, rule):
"""
Initialize the 'not' check.
:param rule: The rule to negate. Must be a Check.
"""
self.rule = rule
def __str__(self):
"""Return a string representation of this check."""
return "not %s" % self.rule
def __call__(self, target, cred):
"""
Check the policy. Returns the logical inverse of the wrapped
check.
"""
return not self.rule(target, cred)
class AndCheck(BaseCheck):
"""
A policy check that requires that a list of other checks all
return True. Implements the "and" operator.
"""
def __init__(self, rules):
"""
Initialize the 'and' check.
:param rules: A list of rules that will be tested.
"""
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' and '.join(str(r) for r in self.rules)
def __call__(self, target, cred):
"""
Check the policy. Requires that all rules accept in order to
return True.
"""
for rule in self.rules:
if not rule(target, cred):
return False
return True
def add_check(self, rule):
"""
Allows addition of another rule to the list of rules that will
be tested. Returns the AndCheck object for convenience.
"""
self.rules.append(rule)
return self
class OrCheck(BaseCheck):
"""
A policy check that requires that at least one of a list of other
checks returns True. Implements the "or" operator.
"""
def __init__(self, rules):
"""
Initialize the 'or' check.
:param rules: A list of rules that will be tested.
"""
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' or '.join(str(r) for r in self.rules)
def __call__(self, target, cred):
"""
Check the policy. Requires that at least one rule accept in
order to return True.
"""
for rule in self.rules:
if rule(target, cred):
return True
return False
def add_check(self, rule):
"""
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
self.rules.append(rule)
return self
def _parse_check(rule):
"""
Parse a single base check rule into an appropriate Check object.
"""
# Handle the special checks
if rule == '!':
return FalseCheck()
elif rule == '@':
return TrueCheck()
try:
kind, match = rule.split(':', 1)
except Exception:
LOG.exception(_("Failed to understand rule %(rule)s") % locals())
# If the rule is invalid, we'll fail closed
return FalseCheck()
# Find what implements the check
if kind in _checks:
return _checks[kind](kind, match)
elif None in _checks:
return _checks[None](kind, match)
else:
LOG.error(_("No handler for matches of kind %s") % kind)
return FalseCheck()
def _parse_list_rule(rule):
"""
Provided for backwards compatibility. Translates the old
list-of-lists syntax into a tree of Check objects.
"""
# Empty rule defaults to True
if not rule:
return TrueCheck()
# Outer list is joined by "or"; inner list by "and"
or_list = []
for inner_rule in rule:
# Elide empty inner lists
if not inner_rule:
continue
# Handle bare strings
if isinstance(inner_rule, basestring):
inner_rule = [inner_rule]
# Parse the inner rules into Check objects
and_list = [_parse_check(r) for r in inner_rule]
# Append the appropriate check to the or_list
if len(and_list) == 1:
or_list.append(and_list[0])
else:
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
if len(or_list) == 0:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
return OrCheck(or_list)
# Used for tokenizing the policy language
_tokenize_re = re.compile(r'\s+')
def _parse_tokenize(rule):
"""
Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
because they can appear inside a check string. Thankfully, those
parentheses that appear inside a check string can never occur at
the very beginning or end ("%(variable)s" is the correct syntax).
"""
for tok in _tokenize_re.split(rule):
# Skip empty tokens
if not tok or tok.isspace():
continue
# Handle leading parens on the token
clean = tok.lstrip('(')
for i in range(len(tok) - len(clean)):
yield '(', '('
# If it was only parentheses, continue
if not clean:
continue
else:
tok = clean
# Handle trailing parens on the token
clean = tok.rstrip(')')
trail = len(tok) - len(clean)
# Yield the cleaned token
lowered = clean.lower()
if lowered in ('and', 'or', 'not'):
# Special tokens
yield lowered, clean
elif clean:
# Not a special token, but not composed solely of ')'
if len(tok) >= 2 and ((tok[0], tok[-1]) in
[('"', '"'), ("'", "'")]):
# It's a quoted string
yield 'string', tok[1:-1]
else:
yield 'check', _parse_check(clean)
# Yield the trailing parens
for i in range(trail):
yield ')', ')'
class ParseStateMeta(type):
"""
Metaclass for the ParseState class. Facilitates identifying
reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
"""
Create the class. Injects the 'reducers' list, a list of
tuples matching token sequences to the names of the
corresponding reduction methods.
"""
reducers = []
for key, value in cls_dict.items():
if not hasattr(value, 'reducers'):
continue
for reduction in value.reducers:
reducers.append((reduction, key))
cls_dict['reducers'] = reducers
return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
def reducer(*tokens):
"""
Decorator for reduction methods. Arguments are a sequence of
tokens, in order, which should trigger running this reduction
method.
"""
def decorator(func):
# Make sure we have a list of reducer sequences
if not hasattr(func, 'reducers'):
func.reducers = []
# Add the tokens to the list of reducer sequences
func.reducers.append(list(tokens))
return func
return decorator
class ParseState(object):
"""
Implement the core of parsing the policy language. Uses a greedy
reduction algorithm to reduce a sequence of tokens into a single
terminal, the value of which will be the root of the Check tree.
Note: error reporting is rather lacking. The best we can get with
this parser formulation is an overall "parse failed" error.
Fortunately, the policy language is simple enough that this
shouldn't be that big a problem.
"""
__metaclass__ = ParseStateMeta
def __init__(self):
"""Initialize the ParseState."""
self.tokens = []
self.values = []
def reduce(self):
"""
Perform a greedy reduction of the token stream. If a reducer
method matches, it will be executed, then the reduce() method
will be called recursively to search for any more possible
reductions.
"""
for reduction, methname in self.reducers:
if (len(self.tokens) >= len(reduction) and
self.tokens[-len(reduction):] == reduction):
# Get the reduction method
meth = getattr(self, methname)
# Reduce the token stream
results = meth(*self.values[-len(reduction):])
# Update the tokens and values
self.tokens[-len(reduction):] = [r[0] for r in results]
self.values[-len(reduction):] = [r[1] for r in results]
# Check for any more reductions
return self.reduce()
def shift(self, tok, value):
"""Adds one more token to the state. Calls reduce()."""
self.tokens.append(tok)
self.values.append(value)
# Do a greedy reduce...
self.reduce()
@property
def result(self):
"""
Obtain the final result of the parse. Raises ValueError if
the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
raise ValueError("Could not parse rule")
return self.values[0]
@reducer('(', 'check', ')')
@reducer('(', 'and_expr', ')')
@reducer('(', 'or_expr', ')')
def _wrap_check(self, _p1, check, _p2):
"""Turn parenthesized expressions into a 'check' token."""
return [('check', check)]
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
"""
Create an 'and_expr' from two checks joined by the 'and'
operator.
"""
return [('and_expr', AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
"""
Extend an 'and_expr' by adding one more check.
"""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
"""
Create an 'or_expr' from two checks joined by the 'or'
operator.
"""
return [('or_expr', OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
"""
Extend an 'or_expr' by adding one more check.
"""
return [('or_expr', or_expr.add_check(check))]
@reducer('not', 'check')
def _make_not_expr(self, _not, check):
"""Invert the result of another check."""
return [('check', NotCheck(check))]
def _parse_text_rule(rule):
"""
Translates a policy written in the policy language into a tree of
Check objects.
"""
# Empty rule means always accept
if not rule:
return TrueCheck()
# Parse the token stream
state = ParseState()
for tok, value in _parse_tokenize(rule):
state.shift(tok, value)
try:
return state.result
except ValueError:
# Couldn't parse the rule
LOG.exception(_("Failed to understand rule %(rule)r") % locals())
# Fail closed
return FalseCheck()
def parse_rule(rule):
"""
Parses a policy rule into a tree of Check objects.
"""
# If the rule is a string, it's in the policy language
if isinstance(rule, basestring):
return _parse_text_rule(rule)
return _parse_list_rule(rule)
def register(name, func=None): def register(name, func=None):
""" """
Register a function as a policy check. Register a function or Check class as a policy check.
:param name: Gives the name of the check type, e.g., 'rule', :param name: Gives the name of the check type, e.g., 'rule',
'role', etc. If name is None, a default function 'role', etc. If name is None, a default check type
will be registered. will be registered.
:param func: If given, provides the function to register. If not :param func: If given, provides the function or class to register.
given, returns a function taking one argument to If not given, returns a function taking one argument
specify the function to register, allowing use as a to specify the function or class to register,
decorator. allowing use as a decorator.
""" """
# Perform the actual decoration by registering the function. # Perform the actual decoration by registering the function or
# Returns the function for compliance with the decorator # class. Returns the function or class for compliance with the
# interface. # decorator interface.
def decorator(func): def decorator(func):
# Register the function _checks[name] = func
Brain._register(name, func)
return func return func
# If the function is given, do the registration # If the function or class is given, do the registration
if func: if func:
return decorator(func) return decorator(func)
@ -247,55 +721,59 @@ def register(name, func=None):
@register("rule") @register("rule")
def _check_rule(brain, match_kind, match, target_dict, cred_dict): class RuleCheck(Check):
"""Recursively checks credentials based on the brains rules.""" def __call__(self, target, creds):
try: """
new_match_list = brain.rules[match] Recursively checks credentials based on the defined rules.
except KeyError: """
if brain.default_rule and match != brain.default_rule:
new_match_list = ('rule:%s' % brain.default_rule,)
else:
return False
return brain.check(new_match_list, target_dict, cred_dict) try:
return _rules[self.match](target, creds)
except KeyError:
# We don't have any matching rule; fail closed
return False
@register("role") @register("role")
def _check_role(brain, match_kind, match, target_dict, cred_dict): class RoleCheck(Check):
"""Check that there is a matching role in the cred dict.""" def __call__(self, target, creds):
return match.lower() in [x.lower() for x in cred_dict['roles']] """Check that there is a matching role in the cred dict."""
return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http') @register('http')
def _check_http(brain, match_kind, match, target_dict, cred_dict): class HttpCheck(Check):
"""Check http: rules by calling to a remote server. def __call__(self, target, creds):
"""
Check http: rules by calling to a remote server.
This example implementation simply verifies that the response is This example implementation simply verifies that the response
exactly 'True'. A custom brain using response codes could easily is exactly 'True'.
be implemented. """
""" url = ('http:' + self.match) % target
url = 'http:' + (match % target_dict) data = {'target': jsonutils.dumps(target),
data = {'target': jsonutils.dumps(target_dict), 'credentials': jsonutils.dumps(creds)}
'credentials': jsonutils.dumps(cred_dict)} post_data = urllib.urlencode(data)
post_data = urllib.urlencode(data) f = urllib2.urlopen(url, post_data)
f = urllib2.urlopen(url, post_data) return f.read() == "True"
return f.read() == "True"
@register(None) @register(None)
def _check_generic(brain, match_kind, match, target_dict, cred_dict): class GenericCheck(Check):
"""Check an individual match. def __call__(self, target, creds):
"""
Check an individual match.
Matches look like: Matches look like:
tenant:%(tenant_id)s tenant:%(tenant_id)s
role:compute:admin role:compute:admin
"""
""" # TODO(termie): do dict inspection via dot syntax
match = self.match % target
# TODO(termie): do dict inspection via dot syntax if self.kind in creds:
match = match % target_dict return match == unicode(creds[self.kind])
if match_kind in cred_dict: return False
return match == unicode(cred_dict[match_kind])
return False

View File

@ -50,25 +50,26 @@ rpc_opts = [
default=['ceilometer.openstack.common.exception', default=['ceilometer.openstack.common.exception',
'nova.exception', 'nova.exception',
'cinder.exception', 'cinder.exception',
'exceptions',
], ],
help='Modules of exceptions that are permitted to be recreated' help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'), 'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit', cfg.BoolOpt('fake_rabbit',
default=False, default=False,
help='If passed, use a fake RabbitMQ provider'), help='If passed, use a fake RabbitMQ provider'),
# cfg.StrOpt('control_exchange',
# The following options are not registered here, but are expected to be default='openstack',
# present. The project using this library must register these options with help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
cfg.set_defaults(rpc_opts,
control_exchange=control_exchange)
def create_connection(new=True): def create_connection(new=True):
"""Create a connection to the message bus used for rpc. """Create a connection to the message bus used for rpc.
@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg): def notify(context, topic, msg, envelope=False):
"""Send notification event. """Send notification event.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
request. request.
:param topic: The topic to send the notification to. :param topic: The topic to send the notification to.
:param msg: This is a dict of content of event. :param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None :returns: None
""" """
return _get_impl().notify(cfg.CONF, context, topic, msg) return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup(): def cleanup():

View File

@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
""" """
import inspect import inspect
import logging
import sys import sys
import uuid import uuid
@ -34,10 +33,10 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import excutils from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import common as rpc_common from ceilometer.openstack.common.rpc import common as rpc_common
@ -55,7 +54,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
LOG.debug('Pool creating new connection') LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf) return self.connection_cls(self.conf)
def empty(self): def empty(self):
@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False): ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id. """Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple. Failure should be a sys.exc_info() tuple.
@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
""" """
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if failure: if failure:
failure = rpc_common.serialize_remote_exception(failure) failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try: try:
msg = {'result': reply, 'failure': failure} msg = {'result': reply, 'failure': failure}
@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure} 'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
conn.direct_send(msg_id, msg) conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext): class RpcContext(rpc_common.CommonRpcContext):
@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values) return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False, def reply(self, reply=None, failure=None, ending=False,
connection_pool=None): connection_pool=None, log_failure=True):
if self.msg_id: if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending) ending, log_failure)
if ending: if ending:
self.msg_id = None self.msg_id = None
@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool) ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool) ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e: except rpc_common.ClientException as e:
LOG.exception('Exception during message handling') LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(), ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool) connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object): class MulticallWaiter(object):
def __init__(self, conf, connection, timeout): def __init__(self, conf, connection, timeout):
@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done, # that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into # connection.close() will get called which will put it back into
# the pool # the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic) LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg return wait_msg
@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool):
@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg, def fanout_cast_to_server(conf, context, server_params, topic, msg,
@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool): def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
event_type = msg.get('event_type') LOG.debug(_('Sending %(event_type)s on %(topic)s'),
LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg) conn.notify_send(topic, msg)
@ -420,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf): def get_control_exchange(conf):
try: return conf.control_exchange
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -18,18 +18,61 @@
# under the License. # under the License.
import copy import copy
import logging import sys
import traceback import traceback
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'ceilometer.version': <RPC Envelope Version as a String>,
'ceilometer.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'ceilometer.version'
_MESSAGE_KEY = 'ceilometer.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") message = _("An unknown RPC related exception occurred.")
@ -40,7 +83,7 @@ class RPCException(Exception):
try: try:
message = self.message % kwargs message = self.message % kwargs
except Exception as e: except Exception:
# kwargs doesn't match a variable in the message # kwargs doesn't match a variable in the message
# log the issue and the kwargs # log the issue and the kwargs
LOG.exception(_('Exception in string format operation')) LOG.exception(_('Exception in string format operation'))
@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object): class Connection(object):
"""A connection, returned by rpc.create_connection(). """A connection, returned by rpc.create_connection().
@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',), SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': ('admin_password',), } 'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data has_context_token = '_context_auth_token' in msg_data
@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data) msg_data = copy.deepcopy(msg_data)
if has_method: if has_method:
method = msg_data['method'] for arg in SANITIZE.get(msg_data['method'], []):
if method in SANITIZE: try:
args_to_sanitize = SANITIZE[method] d = msg_data
for arg in args_to_sanitize: for elem in arg[:-1]:
try: d = d[elem]
msg_data['args'][arg] = "<SANITIZED>" d[arg[-1]] = '<SANITIZED>'
except KeyError: except KeyError, e:
pass LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token: if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>' msg_data['_context_auth_token'] = '<SANITIZED>'
@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data) return log_func(msg, msg_data)
def serialize_remote_exception(failure_info): def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc. """Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple. Failure_info should be a sys.exc_info() tuple.
@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
""" """
tb = traceback.format_exception(*failure_info) tb = traceback.format_exception(*failure_info)
failure = failure_info[1] failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure)) if log_failure:
LOG.error(tb) LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {} kwargs = {}
if hasattr(failure, 'kwargs'): if hasattr(failure, 'kwargs'):
@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override # we cannot necessarily change an exception message so we must override
# the __str__ method. # the __str__ method.
failure.__class__ = new_ex_type failure.__class__ = new_ex_type
except TypeError as e: except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the # NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument. # first exception argument.
failure.args = (message,) + failure.args[1:] failure.args = (message,) + failure.args[1:]
@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted context.values['read_deleted'] = read_deleted
return context return context
class ClientException(Exception):
"""This encapsulates some actual exception that is expected to be
hit by an RPC proxy object. Merely instantiating it records the
current exception information, which will be passed back to the
RPC client without exceptional logging."""
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
if type(e) in exceptions:
raise ClientException()
else:
raise
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
ClientException, which is used internally by the RPC layer."""
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code there can be both versioned and unversioned APIs implemented in the same code
base. base.
EXAMPLES
EXAMPLES: ========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server API as an example. The client side is in nova/compute/rpcapi.py and the server
@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
Example 1) Adding a new method. Example 1) Adding a new method.
-------------------------------
Adding a new method is a backwards compatible change. It should be added to Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example: be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host): def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None) topic = _compute_topic(self.topic, ctxt, host, None)
@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter. Example 2) Adding a new parameter.
----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present. The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None): def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases # The code needs to deal with newarg=None for cases
@ -101,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks self.callbacks = callbacks
super(RpcDispatcher, self).__init__() super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs): def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version. """Dispatch a message based on a requested version.
@ -137,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION rpc_api_version = proxyobj.RPC_API_VERSION
else: else:
rpc_api_version = '1.0' rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version) is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method): if not hasattr(proxyobj, method):
continue continue

View File

@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests.
""" """
import inspect import inspect
# NOTE(russellb): We specifically want to use json, not our own jsonutils.
# jsonutils has some extra logic to automatically convert objects to primitive
# types so that they can be serialized. We want to catch all cases where
# non-primitive types make it into this code and treat it as an error.
import json
import time import time
import eventlet import eventlet
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common.rpc import common as rpc_common from ceilometer.openstack.common.rpc import common as rpc_common
CONSUMERS = {} CONSUMERS = {}
@ -75,6 +79,8 @@ class Consumer(object):
else: else:
res.append(rval) res.append(rval)
done.send(res) done.send(res)
except rpc_common.ClientException as e:
done.send_exception(e._exc_info[1])
except Exception as e: except Exception as e:
done.send_exception(e) done.send_exception(e)
@ -121,7 +127,7 @@ def create_connection(conf, new=True):
def check_serialize(msg): def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized.""" """Make sure a message intended for rpc can be serialized."""
jsonutils.dumps(msg) json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None): def multicall(conf, context, topic, msg, timeout=None):
@ -154,13 +160,14 @@ def call(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg): def cast(conf, context, topic, msg):
check_serialize(msg)
try: try:
call(conf, context, topic, msg) call(conf, context, topic, msg)
except Exception: except Exception:
pass pass
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
check_serialize(msg) check_serialize(msg)

View File

@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: try:
callback(message.payload) msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack() message.ack()
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options # Default options
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id, exchange = kombu.entity.Exchange(name=msg_id,
type='direct', type='direct',
@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False, options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'], durable=options['durable'],
@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id, super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options) type='direct', **options)
@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
""" """
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options) None, type='fanout', **options)
@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
self.consumers = [] self.consumers = []
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
self.max_retries = self.conf.rabbit_max_retries self.max_retries = self.conf.rabbit_max_retries
# Try forever? # Try forever?
@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.release()
except self.connection_errors: except self.connection_errors:
pass pass
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
@ -573,12 +575,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release() self.connection.release()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close() self.channel.close()
self.channel = self.connection.channel() self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3 # work around 'memory' transport bug in 1.1.3
@ -644,6 +648,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
self.declare_fanout_consumer(topic, proxy_cb) self.declare_fanout_consumer(topic, proxy_cb)
@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify( return rpc_amqp.notify(
conf, context, topic, msg, conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -17,21 +17,23 @@
import functools import functools
import itertools import itertools
import logging
import time import time
import uuid import uuid
import eventlet import eventlet
import greenlet import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import amqp as rpc_amqp from ceilometer.openstack.common.rpc import amqp as rpc_amqp
from ceilometer.openstack.common.rpc import common as rpc_common from ceilometer.openstack.common.rpc import common as rpc_common
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
qpid_opts = [ qpid_opts = [
@ -41,6 +43,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port', cfg.StrOpt('qpid_port',
default='5672', default='5672',
help='Qpid broker port'), help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username', cfg.StrOpt('qpid_username',
default='', default='',
help='Username for qpid connection'), help='Username for qpid connection'),
@ -121,7 +126,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object""" """Fetch the message and pass it to the callback object"""
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self.callback(message.content) msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
@ -271,28 +277,38 @@ class Connection(object):
pool = None pool = None
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = { params = {
'hostname': self.conf.qpid_hostname, 'qpid_hosts': self.conf.qpid_hosts,
'port': self.conf.qpid_port,
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
'password': self.conf.qpid_password, 'password': self.conf.qpid_password,
} }
params.update(server_params or {}) params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port']) self.brokers = params['qpid_hosts']
self.username = params['username'] self.username = params['username']
self.password = params['password'] self.password = params['password']
self.connection_create() self.connection_create(self.brokers[0])
self.reconnect() self.reconnect()
def connection_create(self): def connection_create(self, broker):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker) self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
@ -317,15 +333,19 @@ class Connection(object):
if self.connection.opened(): if self.connection.opened():
try: try:
self.connection.close() self.connection.close()
except qpid.messaging.exceptions.ConnectionError: except qpid_exceptions.ConnectionError:
pass pass
attempt = 0
delay = 1 delay = 1
while True: while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try: try:
self.connection_create() self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. " msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict "Sleeping %(delay)s seconds") % msg_dict
@ -333,10 +353,9 @@ class Connection(object):
time.sleep(delay) time.sleep(delay)
delay = min(2 * delay, 60) delay = min(2 * delay, 60)
else: else:
LOG.info(_('Connected to AMQP server on %s'), broker)
break break
LOG.info(_('Connected to AMQP server on %s'), self.broker)
self.session = self.connection.session() self.session = self.connection.session()
if self.consumers: if self.consumers:
@ -353,8 +372,8 @@ class Connection(object):
while True: while True:
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty, except (qpid_exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e: qpid_exceptions.ConnectionError), e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
@ -362,12 +381,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close() self.connection.close()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close() self.session.close()
self.session = self.connection.session() self.session = self.connection.session()
self.consumers = {} self.consumers = {}
@ -392,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers""" """Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc): def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty): if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') % LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc)) str(exc))
raise rpc_common.Timeout() raise rpc_common.Timeout()
@ -422,6 +443,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg): def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -497,6 +523,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -512,6 +539,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name) name=pool_name)
@ -570,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg, return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import pprint import pprint
import socket import socket
import string import string
@ -22,15 +23,16 @@ import types
import uuid import uuid
import eventlet import eventlet
from eventlet.green import zmq
import greenlet import greenlet
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import processutils as utils
from ceilometer.openstack.common.rpc import common as rpc_common from ceilometer.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified. # for convenience, are not modified.
pformat = pprint.pformat pformat = pprint.pformat
@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1, cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'), help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'), help='Directory for holding IPC sockets'),
@ -70,9 +76,9 @@ zmq_opts = [
] ]
# These globals are defined in register_opts(conf), CONF = cfg.CONF
# a mandatory initialization call CONF.register_opts(zmq_opts)
CONF = None
ZMQ_CTX = None # ZeroMQ Context, must be global. ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object matchmaker = None # memoized matchmaker object
@ -107,7 +113,7 @@ class ZmqSocket(object):
""" """
def __init__(self, addr, zmq_type, bind=True, subscribe=None): def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type) self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr self.addr = addr
self.type = zmq_type self.type = zmq_type
self.subscriptions = [] self.subscriptions = []
@ -181,11 +187,15 @@ class ZmqSocket(object):
pass pass
self.subscriptions = [] self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try: try:
self.sock.close(linger=-1) # Default is to linger
self.sock.close()
except Exception: except Exception:
pass # While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None self.sock = None
def recv(self): def recv(self):
@ -202,10 +212,14 @@ class ZmqSocket(object):
class ZmqClient(object): class ZmqClient(object):
"""Client for ZMQ sockets.""" """Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False): def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)]) _serialize(data)])
@ -250,7 +264,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic.""" """Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict()) LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
try: try:
result = proxy.dispatch( result = proxy.dispatch(
@ -259,7 +273,14 @@ class InternalContext(object):
except greenlet.GreenletExit: except greenlet.GreenletExit:
# ignore these since they are just from shutdowns # ignore these since they are just from shutdowns
pass pass
except rpc_common.ClientException, e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception: except Exception:
LOG.error(_("Exception during message handling"))
return {'exc': return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())} rpc_common.serialize_remote_exception(sys.exc_info())}
@ -314,7 +335,7 @@ class ConsumerBase(object):
return return
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
proxy.dispatch(ctx, data['version'], proxy.dispatch(ctx, data['version'],
data['method'], **data['args']) data['method'], **data['args'])
@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf) super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {} self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock): def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'): elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB sock_type = zmq.PUB
inside = _deserialize(in_msg) inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id'] msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response'] response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response) LOG.debug(_("->response->%s"), response)
@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH sock_type = zmq.PUSH
if not topic in self.topic_proxy: if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), def publisher(waiter):
sock_type, bind=True) LOG.info(_("Creating proxy for topic: %s"), topic)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open, try:
# before we can have any faith in doing a send() to it. out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
if sock_type == zmq.PUB: (ipc_dir, topic),
eventlet.sleep(.5) sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic] = eventlet.queue.LightQueue(
self.topic_proxy[topic].send(data) CONF.rpc_zmq_topic_backlog)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor): class ZmqReactor(ZmqBaseReactor):
@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg) ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx) ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock] proxy = self.proxies[sock]
@ -524,7 +600,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None): def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload) conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn.close() conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None): def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
) )
LOG.debug(_("Sending cast")) LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload) _cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply")) LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply # Blocks until receives reply
@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1] return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None): def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
conf = CONF conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic) queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues) LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results # Don't stack if we have no matchmaker results
@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout) _topic, _topic, msg, timeout, serialize,
force_envelope)
return return
return method(_addr, context, _topic, _topic, msg, timeout) return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
def create_connection(conf, new=True): def create_connection(conf, new=True):
@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not # NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions. # work with our assumptions.
topic.replace('.', '-') topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs) cast(conf, context, topic, msg, **kwargs)
def cleanup(): def cleanup():
"""Clean up resources in use by implementation.""" """Clean up resources in use by implementation."""
global ZMQ_CTX global ZMQ_CTX
global matchmaker if ZMQ_CTX:
matchmaker = None ZMQ_CTX.term()
ZMQ_CTX.term()
ZMQ_CTX = None ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker global matchmaker
global CONF matchmaker = None
if not CONF:
conf.register_opts(zmq_opts) def _get_ctxt():
CONF = conf if not zmq:
# Don't re-set, if this method is called twice. raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX: if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker():
global matchmaker
if not matchmaker: if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class' # rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.') mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1]) mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1] mm_class = mm_path[-1]
@ -713,6 +794,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module) mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class) mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor() matchmaker = mm_constructor()
return matchmaker
register_opts(cfg.CONF)

View File

@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib import contextlib
import itertools import itertools
import json import json
import logging
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
matchmaker_opts = [ matchmaker_opts = [

View File

@ -27,20 +27,17 @@ import sys
import time import time
import eventlet import eventlet
import greenlet
import logging as std_logging import logging as std_logging
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import eventlet_backdoor from ceilometer.openstack.common import eventlet_backdoor
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import threadgroup from ceilometer.openstack.common import threadgroup
try:
from ceilometer.openstack.common import rpc
except ImportError:
rpc = None
rpc = importutils.try_import('ceilometer.openstack.common.rpc')
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -54,7 +51,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = [] self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled() eventlet_backdoor.initialize_if_enabled()
@staticmethod @staticmethod
@ -75,8 +72,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
gt = eventlet.spawn(self.run_service, service) self._services.add_thread(self.run_service, service)
self._services.append(gt)
def stop(self): def stop(self):
"""Stop all services which are currently running. """Stop all services which are currently running.
@ -84,8 +80,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
for service in self._services: self._services.stop()
service.kill()
def wait(self): def wait(self):
"""Waits until all services have been stopped, and then returns. """Waits until all services have been stopped, and then returns.
@ -93,11 +88,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
for service in self._services: self._services.wait()
try:
service.wait()
except greenlet.GreenletExit:
pass
class SignalExit(SystemExit): class SignalExit(SystemExit):
@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc: except SystemExit as exc:
status = exc.code status = exc.code
finally: finally:
self.stop()
if rpc: if rpc:
rpc.cleanup() rpc.cleanup()
self.stop()
return status return status
@ -252,7 +243,10 @@ class ProcessLauncher(object):
def _wait_child(self): def _wait_child(self):
try: try:
pid, status = os.wait() # Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc: except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD): if exc.errno not in (errno.EINTR, errno.ECHILD):
raise raise
@ -260,10 +254,12 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status): if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status) sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else: else:
code = os.WEXITSTATUS(status) code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) LOG.info(_('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children: if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid) LOG.warning(_('pid %d not in child list'), pid)
@ -282,6 +278,10 @@ class ProcessLauncher(object):
while self.running: while self.running:
wrap = self._wait_child() wrap = self._wait_child()
if not wrap: if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue continue
while self.running and len(wrap.children) < wrap.workers: while self.running and len(wrap.children) < wrap.workers:
@ -309,8 +309,8 @@ class ProcessLauncher(object):
class Service(object): class Service(object):
"""Service object for binaries running on hosts.""" """Service object for binaries running on hosts."""
def __init__(self): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service') self.tg = threadgroup.ThreadGroup(threads)
def start(self): def start(self):
pass pass

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC. # Copyright 2011 OpenStack LLC.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,7 +20,7 @@
Utilities with minimum-depends for use in setup.py Utilities with minimum-depends for use in setup.py
""" """
import datetime import email
import os import os
import re import re
import subprocess import subprocess
@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
if os.path.exists(mailmap): if os.path.exists(mailmap):
with open(mailmap, 'r') as fp: with open(mailmap, 'r') as fp:
for l in fp: for l in fp:
l = l.strip() try:
if not l.startswith('#') and ' ' in l: canonical_email, alias = re.match(
canonical_email, alias = [x for x in l.split(' ') r'[^#]*?(<.+>).*(<.+>).*', l).groups()
if x.startswith('<')] except AttributeError:
mapping[alias] = canonical_email continue
mapping[alias] = canonical_email
return mapping return mapping
@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all """Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email. instances of the aliases in the string with their real email.
""" """
for alias, email in mapping.iteritems(): for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email) changelog = changelog.replace(alias, email_address)
return changelog return changelog
@ -106,23 +108,17 @@ def parse_dependency_links(requirements_files=['requirements.txt',
return dependency_links return dependency_links
def write_requirements(): def _run_shell_command(cmd, throw_on_error=False):
venv = os.environ.get('VIRTUAL_ENV', None)
if venv is not None:
with open("requirements.txt", "w") as req_file:
output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
stdout=subprocess.PIPE)
requirements = output.communicate()[0].strip()
req_file.write(requirements)
def _run_shell_command(cmd):
if os.name == 'nt': if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd], output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE) stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else: else:
output = subprocess.Popen(["/bin/sh", "-c", cmd], output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE) stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if output.returncode and throw_on_error:
raise Exception("%s returned %d" % cmd, output.returncode)
out = output.communicate() out = output.communicate()
if len(out) == 0: if len(out) == 0:
return None return None
@ -131,57 +127,6 @@ def _run_shell_command(cmd):
return out[0].strip() return out[0].strip()
def _get_git_next_version_suffix(branch_name):
datestamp = datetime.datetime.now().strftime('%Y%m%d')
if branch_name == 'milestone-proposed':
revno_prefix = "r"
else:
revno_prefix = ""
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
if milestonever:
first_half = "%s~%s" % (milestonever, datestamp)
else:
first_half = datestamp
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))
def _get_git_current_tag():
return _run_shell_command("git tag --contains HEAD")
def _get_git_tag_info():
return _run_shell_command("git describe --tags")
def _get_git_post_version():
current_tag = _get_git_current_tag()
if current_tag is not None:
return current_tag
else:
tag_info = _get_git_tag_info()
if tag_info is None:
base_version = "0.0"
cmd = "git --no-pager log --oneline"
out = _run_shell_command(cmd)
revno = len(out.split("\n"))
sha = _run_shell_command("git describe --always")
else:
tag_infos = tag_info.split("-")
base_version = "-".join(tag_infos[:-2])
(revno, sha) = tag_infos[-2:]
return "%s.%s.%s" % (base_version, revno, sha)
def write_git_changelog(): def write_git_changelog():
"""Write a changelog based on the git changelog.""" """Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog' new_changelog = 'ChangeLog'
@ -227,26 +172,6 @@ _rst_template = """%(heading)s
""" """
def read_versioninfo(project):
"""Read the versioninfo file. If it doesn't exist, we're in a github
zipball, and there's really no way to know what version we really
are, but that should be ok, because the utility of that should be
just about nil if this code path is in use in the first place."""
versioninfo_path = os.path.join(project, 'versioninfo')
if os.path.exists(versioninfo_path):
with open(versioninfo_path, 'r') as vinfo:
version = vinfo.read().strip()
else:
version = "0.0.0"
return version
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
with open(os.path.join(project, 'versioninfo'), 'w') as fil:
fil.write("%s\n" % version)
def get_cmdclass(): def get_cmdclass():
"""Return dict of commands to run from setup.py.""" """Return dict of commands to run from setup.py."""
@ -276,6 +201,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc): class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self): def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir) print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {} modules = {}
@ -311,56 +239,83 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'): if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex() self.generate_autoindex()
for builder in ['html', 'man']: for builder in self.builders:
self.builder = builder self.builder = builder
self.finalize_options() self.finalize_options()
self.project = self.distribution.get_name() self.project = self.distribution.get_name()
self.version = self.distribution.get_version() self.version = self.distribution.get_version()
self.release = self.distribution.get_version() self.release = self.distribution.get_version()
BuildDoc.run(self) BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError: except ImportError:
pass pass
return cmdclass return cmdclass
def get_git_branchname(): def get_version_from_git(pre_version):
for branch in _run_shell_command("git branch --color=never").split("\n"):
if branch.startswith('*'):
_branch_name = branch.split()[1].strip()
if _branch_name == "(no":
_branch_name = "no-branch"
return _branch_name
def get_pre_version(projectname, base_version):
"""Return a version which is leading up to a version that will
be released in the future."""
if os.path.isdir('.git'):
current_tag = _get_git_current_tag()
if current_tag is not None:
version = current_tag
else:
branch_name = os.getenv('BRANCHNAME',
os.getenv('GERRIT_REFNAME',
get_git_branchname()))
version_suffix = _get_git_next_version_suffix(branch_name)
version = "%s~%s" % (base_version, version_suffix)
write_versioninfo(projectname, version)
return version
else:
version = read_versioninfo(projectname)
return version
def get_post_version(projectname):
"""Return a version which is equal to the tag that's on the current """Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions revision if there is one, or tag plus number of additional revisions
if the current revision has no tag.""" if the current revision has no tag."""
if os.path.isdir('.git'): if os.path.isdir('.git'):
version = _get_git_post_version() if pre_version:
write_versioninfo(projectname, version) try:
return _run_shell_command(
"git describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
sha = _run_shell_command("git log -n1 --pretty=format:%h")
describe = _run_shell_command("git describe --always")
revno = describe.rsplit("-", 2)[-2]
return "%s.a%s.g%s" % (pre_version, revno, sha)
else:
return _run_shell_command(
"git describe --always").replace('-', '.')
return None
def get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name, pre_version=None):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version return version
return read_versioninfo(projectname) version = get_version_from_pkg_info(package_name)
if version:
return version
version = get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -18,7 +18,6 @@ from eventlet import greenlet
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenthread from eventlet import greenthread
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import loopingcall from ceilometer.openstack.common import loopingcall
@ -27,22 +26,19 @@ LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs): def _thread_done(gt, *args, **kwargs):
''' """ Callback function to be passed to GreenThread.link() when we spawn()
Callback function to be passed to GreenThread.link() when we spawn() Calls the :class:`ThreadGroup` to notify if.
Calls the ThreadGroup to notify if.
''' """
kwargs['group'].thread_done(kwargs['thread']) kwargs['group'].thread_done(kwargs['thread'])
class Thread(object): class Thread(object):
""" Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
""" """
Wrapper around a greenthread, that holds a reference to def __init__(self, thread, group):
the ThreadGroup. The Thread will notify the ThreadGroup
when it has done so it can be removed from the threads
list.
"""
def __init__(self, name, thread, group):
self.name = name
self.thread = thread self.thread = thread
self.thread.link(_thread_done, group=group, thread=self) self.thread.link(_thread_done, group=group, thread=self)
@ -54,14 +50,13 @@ class Thread(object):
class ThreadGroup(object): class ThreadGroup(object):
""" """ The point of the ThreadGroup classis to:
The point of this class is to:
- keep track of timers and greenthreads (making it easier to stop them * keep track of timers and greenthreads (making it easier to stop them
when need be). when need be).
- provide an easy API to add timers. * provide an easy API to add timers.
""" """
def __init__(self, name, thread_pool_size=10): def __init__(self, thread_pool_size=10):
self.name = name
self.pool = greenpool.GreenPool(thread_pool_size) self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = [] self.threads = []
self.timers = [] self.timers = []
@ -75,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs): def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs) gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self) th = Thread(gt, self)
self.threads.append(th) self.threads.append(th)
def thread_done(self, thread): def thread_done(self, thread):

View File

@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds): def is_older_than(before, seconds):
"""Return True if before is older than seconds.""" """Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds) return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds): def is_newer_than(after, seconds):
"""Return True if after is newer than seconds.""" """Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds) return after - utcnow() > datetime.timedelta(seconds=seconds)
@ -87,7 +91,10 @@ def utcnow_ts():
def utcnow(): def utcnow():
"""Overridable version of utils.utcnow.""" """Overridable version of utils.utcnow."""
if utcnow.override_time: if utcnow.override_time:
return utcnow.override_time try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow() return datetime.datetime.utcnow()
@ -95,14 +102,21 @@ utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()): def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time.""" """
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
"""
utcnow.override_time = override_time utcnow.override_time = override_time
def advance_time_delta(timedelta): def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta.""" """Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None) assert(not utcnow.override_time is None)
utcnow.override_time += timedelta try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds): def advance_time_seconds(seconds):
@ -135,3 +149,16 @@ def unmarshall_time(tyme):
minute=tyme['minute'], minute=tyme['minute'],
second=tyme['second'], second=tyme['second'],
microsecond=tyme['microsecond']) microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC # Copyright 2012 OpenStack LLC
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -15,134 +15,65 @@
# under the License. # under the License.
""" """
Utilities for consuming the auto-generated versioninfo files. Utilities for consuming the version from pkg_resources.
""" """
import datetime
import pkg_resources import pkg_resources
import setup
class _deferred_version_string(object):
"""Internal helper class which provides delayed version calculation."""
def __init__(self, version_info, prefix):
self.version_info = version_info
self.prefix = prefix
def __str__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
def __repr__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
class VersionInfo(object): class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None): def __init__(self, package):
"""Object that understands versioning for a package """Object that understands versioning for a package
:param package: name of the top level python namespace. For glance, :param package: name of the python package, such as glance, or
this would be "glance" for python-glanceclient, it python-glanceclient
would be "glanceclient"
:param python_package: optional name of the project name. For
glance this can be left unset. For
python-glanceclient, this would be
"python-glanceclient"
:param pre_version: optional version that the project is working to
""" """
self.package = package self.package = package
if python_package is None: self.release = None
self.python_package = package
else:
self.python_package = python_package
self.pre_version = pre_version
self.version = None self.version = None
self._cached_version = None
def _generate_version(self): def _get_version_from_pkg_resources(self):
"""Defer to the openstack.common.setup routines for making a """Get the version of the package from the pkg_resources record
version from git.""" associated with the package."""
if self.pre_version is None: requirement = pkg_resources.Requirement.parse(self.package)
return setup.get_post_version(self.python_package) provider = pkg_resources.get_provider(requirement)
else: return provider.version
return setup.get_pre_version(self.python_package, self.pre_version)
def _newer_version(self, pending_version): def release_string(self):
"""Check to see if we're working with a stale version or not.
We expect a version string that either looks like:
2012.2~f3~20120708.10.4426392
which is an unreleased version of a pre-version, or:
0.1.1.4.gcc9e28a
which is an unreleased version of a post-version, or:
0.1.1
Which is a release and which should match tag.
For now, if we have a date-embedded version, check to see if it's
old, and if so re-generate. Otherwise, just deal with it.
"""
try:
version_date = int(self.version.split("~")[-1].split('.')[0])
if version_date < int(datetime.date.today().strftime('%Y%m%d')):
return self._generate_version()
else:
return pending_version
except Exception:
return pending_version
def version_string_with_vcs(self, always=False):
"""Return the full version of the package including suffixes indicating """Return the full version of the package including suffixes indicating
VCS status. VCS status.
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 if this is a final
release, or else something like 2012.2~f1~20120705.20 if it's not.
:param always: if true, skip all version caching
""" """
if always: if self.release is None:
self.version = self._generate_version() self.release = self._get_version_from_pkg_resources()
return self.release
def version_string(self):
"""Return the short version minus any alpha/beta tags."""
if self.version is None: if self.version is None:
parts = []
requirement = pkg_resources.Requirement.parse(self.python_package) for part in self.release_string().split('.'):
versioninfo = "%s/versioninfo" % self.package if part[0].isdigit():
try: parts.append(part)
raw_version = pkg_resources.resource_string(requirement, else:
versioninfo) break
self.version = self._newer_version(raw_version.strip()) self.version = ".".join(parts)
except (IOError, pkg_resources.DistributionNotFound):
self.version = self._generate_version()
return self.version return self.version
def canonical_version_string(self, always=False): # Compatibility functions
"""Return the simple version of the package excluding any suffixes. canonical_version_string = version_string
version_string_with_vcs = release_string
For instance, if we are working towards the 2012.2 release, def cached_version_string(self, prefix=""):
canonical_version_string should return 2012.2 in all cases.
:param always: if true, skip all version caching
"""
return self.version_string_with_vcs(always).split('~')[0]
def version_string(self, always=False):
"""Return the base version of the package.
For instance, if we are working towards the 2012.2 release,
version_string should return 2012.2 if this is a final release, or
2012.2-dev if it is not.
:param always: if true, skip all version caching
"""
version_parts = self.version_string_with_vcs(always).split('~')
if len(version_parts) == 1:
return version_parts[0]
else:
return '%s-dev' % (version_parts[0],)
def deferred_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to """Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested rather only do the calculation when and if a version is requested
""" """
return _deferred_version_string(self, prefix) if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -49,26 +49,18 @@ def init():
if not _POLICY_PATH: if not _POLICY_PATH:
raise cfg.ConfigFilesNotFoundError([cfg.CONF.policy_file]) raise cfg.ConfigFilesNotFoundError([cfg.CONF.policy_file])
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE, utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
reload_func=_set_brain) reload_func=_set_rules)
def _set_brain(data): def _set_rules(data):
default_rule = cfg.CONF.policy_default_rule default_rule = cfg.CONF.policy_default_rule
policy.set_brain(policy.Brain.load_json(data, default_rule)) policy.set_rules(policy.Rules.load_json(data, default_rule))
def check_is_admin(roles, project_id, project_name): def check_is_admin(roles):
"""Whether or not roles contains 'admin' role according to policy setting. """Whether or not roles contains 'admin' role according to policy setting.
""" """
init() init()
match_list = ('rule:context_is_admin',) return policy.check('context_is_admin', {}, {'roles': roles})
target = {}
credentials = {
'roles': roles,
'project_id': project_id,
'project_name': project_name,
}
return policy.enforce(match_list, target, credentials)

View File

@ -30,9 +30,6 @@ PUBLISH_OPTS = [
default='metering', default='metering',
help='the topic ceilometer uses for metering messages', help='the topic ceilometer uses for metering messages',
), ),
cfg.StrOpt('control_exchange',
default='ceilometer',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]

View File

@ -21,6 +21,7 @@ import os
import socket import socket
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common.rpc import service as rpc_service
@ -77,5 +78,6 @@ def _sanitize_cmd_line(argv):
def prepare_service(argv=[]): def prepare_service(argv=[]):
rpc.set_defaults(control_exchange='ceilometer')
cfg.CONF(argv[1:], project='ceilometer') cfg.CONF(argv[1:], project='ceilometer')
log.setup('ceilometer') log.setup('ceilometer')

View File

@ -22,5 +22,4 @@ from ceilometer.openstack.common import version as common_version
NEXT_VERSION = '2013.1' NEXT_VERSION = '2013.1'
version_info = common_version.VersionInfo('ceilometer', version_info = common_version.VersionInfo('ceilometer')
pre_version=NEXT_VERSION)

View File

@ -22,13 +22,12 @@ import os
import setuptools import setuptools
from ceilometer.openstack.common import setup as common_setup from ceilometer.openstack.common import setup as common_setup
from ceilometer.version import version_info from ceilometer.version import NEXT_VERSION
requires = common_setup.parse_requirements(['tools/pip-requires']) requires = common_setup.parse_requirements(['tools/pip-requires'])
depend_links = common_setup.parse_dependency_links(['tools/pip-requires']) depend_links = common_setup.parse_dependency_links(['tools/pip-requires'])
url_base = 'http://tarballs.openstack.org/ceilometer/ceilometer-%s.tar.gz' url_base = 'http://tarballs.openstack.org/ceilometer/ceilometer-%s.tar.gz'
version_string = version_info.canonical_version_string(always=True)
def directories(target_dir): def directories(target_dir):
@ -39,7 +38,7 @@ def directories(target_dir):
setuptools.setup( setuptools.setup(
name='ceilometer', name='ceilometer',
version=version_string, version=NEXT_VERSION,
description='cloud computing metering', description='cloud computing metering',
@ -47,7 +46,7 @@ setuptools.setup(
author_email='ceilometer@lists.launchpad.net', author_email='ceilometer@lists.launchpad.net',
url='https://launchpad.net/ceilometer', url='https://launchpad.net/ceilometer',
download_url=url_base % version_string, download_url=url_base % NEXT_VERSION,
classifiers=[ classifiers=[
'Development Status :: 3 - Alpha', 'Development Status :: 3 - Alpha',

View File

@ -35,17 +35,17 @@ class TestStatisticsDuration(unittest.TestCase):
# Create events relative to the range and pretend # Create events relative to the range and pretend
# that the intervening events exist. # that the intervening events exist.
self.early1 = datetime.datetime(2012, 8, 27, 7, 0) self.early1 = datetime.datetime(2012, 8, 27, 7, 0)
self.early2 = datetime.datetime(2012, 8, 27, 17, 0) self.early2 = datetime.datetime(2012, 8, 27, 17, 0)
self.start = datetime.datetime(2012, 8, 28, 0, 0) self.start = datetime.datetime(2012, 8, 28, 0, 0)
self.middle1 = datetime.datetime(2012, 8, 28, 8, 0) self.middle1 = datetime.datetime(2012, 8, 28, 8, 0)
self.middle2 = datetime.datetime(2012, 8, 28, 18, 0) self.middle2 = datetime.datetime(2012, 8, 28, 18, 0)
self.end = datetime.datetime(2012, 8, 28, 23, 59) self.end = datetime.datetime(2012, 8, 28, 23, 59)
self.late1 = datetime.datetime(2012, 8, 29, 9, 0) self.late1 = datetime.datetime(2012, 8, 29, 9, 0)
self.late2 = datetime.datetime(2012, 8, 29, 19, 0) self.late2 = datetime.datetime(2012, 8, 29, 19, 0)
def test_nulls(self): def test_nulls(self):

View File

@ -19,10 +19,21 @@
import subprocess import subprocess
import unittest import unittest
import tempfile
import os
class BinDbsyncTestCase(unittest.TestCase): class BinDbsyncTestCase(unittest.TestCase):
def setUp(self):
self.tempfile = tempfile.mktemp()
with open(self.tempfile, 'w') as tmp:
tmp.write("[DEFAULT]\n")
tmp.write("database_connection=log://localhost\n")
def test_dbsync_run(self): def test_dbsync_run(self):
subp = subprocess.Popen(["../bin/ceilometer-dbsync", subp = subprocess.Popen(["../bin/ceilometer-dbsync",
"--database_connection=log://localhost"]) "--config-file=%s" % self.tempfile])
self.assertEqual(subp.wait(), 0) self.assertEqual(subp.wait(), 0)
def tearDown(self):
os.unlink(self.tempfile)

View File

@ -17,3 +17,4 @@ python-keystoneclient>=0.2,<0.3
python-swiftclient python-swiftclient
lxml lxml
requests<1.0 requests<1.0
extras

View File

@ -21,7 +21,7 @@ commands = {toxinidir}/run_tests.sh --no-path-adjustment --with-coverage --cover
[testenv:pep8] [testenv:pep8]
deps = pep8==1.3.3 deps = pep8==1.3.3
commands = pep8 --repeat --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests commands = pep8 --repeat --ignore=E125 --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests
[testenv:venv] [testenv:venv]
deps = -r{toxinidir}/tools/test-requires deps = -r{toxinidir}/tools/test-requires