Update openstack.common

Change-Id: Ib9dfbcf3f08b20671aadd76f8789d9a9e72fd2eb
This commit is contained in:
Julien Danjou 2013-07-29 11:58:32 +02:00
parent dbb8ca35fc
commit 11e0766bd7
33 changed files with 875 additions and 667 deletions

35
ceilometer/openstack/common/config/generator.py Executable file → Normal file
View File

@ -1,4 +1,3 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 SINA Corporation
@ -16,8 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Zhongyue Luo, SINA Corporation.
#
"""Extracts OpenStack config option info from module(s)."""
@ -53,7 +50,6 @@ OPT_TYPES = {
MULTISTROPT: 'multi valued',
}
OPTION_COUNT = 0
OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
FLOATOPT, LISTOPT,
MULTISTROPT]))
@ -100,8 +96,6 @@ def generate(srcfiles):
for group, opts in opts_by_group.items():
print_group_opts(group, opts)
print("# Total option count: %d" % OPTION_COUNT)
def _import_module(mod_str):
try:
@ -166,9 +160,7 @@ def _list_opts(obj):
def print_group_opts(group, opts_by_module):
print("[%s]" % group)
print('')
global OPTION_COUNT
for mod, opts in opts_by_module:
OPTION_COUNT += len(opts)
print('#')
print('# Options defined in %s' % mod)
print('#')
@ -189,24 +181,24 @@ def _get_my_ip():
return None
def _sanitize_default(s):
def _sanitize_default(name, value):
"""Set up a reasonably sensible default for pybasedir, my_ip and host."""
if s.startswith(sys.prefix):
if value.startswith(sys.prefix):
# NOTE(jd) Don't use os.path.join, because it is likely to think the
# second part is an absolute pathname and therefore drop the first
# part.
s = os.path.normpath("/usr/" + s[len(sys.prefix):])
elif s.startswith(BASEDIR):
return s.replace(BASEDIR, '/usr/lib/python/site-packages')
elif BASEDIR in s:
return s.replace(BASEDIR, '')
elif s == _get_my_ip():
value = os.path.normpath("/usr/" + value[len(sys.prefix):])
elif value.startswith(BASEDIR):
return value.replace(BASEDIR, '/usr/lib/python/site-packages')
elif BASEDIR in value:
return value.replace(BASEDIR, '')
elif value == _get_my_ip():
return '10.0.0.1'
elif s == socket.gethostname():
elif value == socket.gethostname() and 'host' in name:
return 'ceilometer'
elif s.strip() != s:
return '"%s"' % s
return s
elif value.strip() != value:
return '"%s"' % value
return value
def _print_opt(opt):
@ -227,7 +219,8 @@ def _print_opt(opt):
print('#%s=<None>' % opt_name)
elif opt_type == STROPT:
assert(isinstance(opt_default, basestring))
print('#%s=%s' % (opt_name, _sanitize_default(opt_default)))
print('#%s=%s' % (opt_name, _sanitize_default(opt_name,
opt_default)))
elif opt_type == BOOLOPT:
assert(isinstance(opt_default, bool))
print('#%s=%s' % (opt_name, str(opt_default).lower()))

View File

@ -18,7 +18,7 @@
"""DB related custom exceptions."""
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
class DBError(Exception):

View File

@ -0,0 +1,159 @@
# coding: utf-8
#
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Base on code in migrate/changeset/databases/sqlite.py which is under
# the following license:
#
# The MIT License
#
# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
import re
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from sqlalchemy.schema import UniqueConstraint
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
This feature is needed for _recreate_table() to work properly.
Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
"""
data = table.metadata.bind.execute(
"""SELECT sql
FROM sqlite_master
WHERE
type='table' AND
name=:table_name""",
table_name=table.name
).fetchone()[0]
UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
return [
UniqueConstraint(
*[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
name=name
)
for name, cols in re.findall(UNIQUE_PATTERN, data)
]
def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
"""Recreate the table properly
Unlike the corresponding original method of sqlalchemy-migrate this one
doesn't drop existing unique constraints when creating a new one.
"""
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
# problems during copy and re-create
for index in table.indexes:
index.drop()
# reflect existing unique constraints
for uc in self._get_unique_constraints(table):
table.append_constraint(uc)
# omit given unique constraints when creating a new table if required
table.constraints = set([
cons for cons in table.constraints
if omit_uniques is None or cons.name not in omit_uniques
])
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column, delta)
table.create(bind=self.connection)
self.append(insertion_string % {'table_name': table_name})
self.execute()
self.append('DROP TABLE migration_tmp')
self.execute()
def _visit_migrate_unique_constraint(self, *p, **k):
"""Drop the given unique constraint
The corresponding original method of sqlalchemy-migrate just
raises NotImplemented error
"""
self.recreate_table(p[0].table, omit_uniques=[p[0].name])
def patch_migrate():
"""A workaround for SQLite's inability to alter things
SQLite abilities to alter tables are very limited (please read
http://www.sqlite.org/lang_altertable.html for more details).
E. g. one can't drop a column or a constraint in SQLite. The
workaround for this is to recreate the original table omitting
the corresponding constraint (or column).
sqlalchemy-migrate library has recreate_table() method that
implements this workaround, but it does it wrong:
- information about unique constraints of a table
is not retrieved. So if you have a table with one
unique constraint and a migration adding another one
you will end up with a table that has only the
latter unique constraint, and the former will be lost
- dropping of unique constraints is not supported at all
The proper way to fix this is to provide a pull-request to
sqlalchemy-migrate, but the project seems to be dead. So we
can go on with monkey-patching of the lib at least for now.
"""
# this patch is needed to ensure that recreate_table() doesn't drop
# existing unique constraints of the table when creating a new one
helper_cls = sqlite.SQLiteHelper
helper_cls.recreate_table = _recreate_table
helper_cls._get_unique_constraints = _get_unique_constraints
# this patch is needed to be able to drop existing unique constraints
constraint_cls = sqlite.SQLiteConstraintDropper
constraint_cls.visit_migrate_unique_constraint = \
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)

View File

@ -22,11 +22,13 @@
SQLAlchemy models.
"""
import six
from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
from ceilometer.openstack.common.db.sqlalchemy.session import get_session
from ceilometer.openstack.common.db.sqlalchemy import session as sa
from ceilometer.openstack.common import timeutils
@ -37,7 +39,7 @@ class ModelBase(object):
def save(self, session=None):
"""Save this object."""
if not session:
session = get_session()
session = sa.get_session()
# NOTE(boris-42): This part of code should be look like:
# sesssion.add(self)
# session.flush()
@ -70,12 +72,12 @@ class ModelBase(object):
return self
def next(self):
n = self._i.next()
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
for k, v in six.iteritems(values):
setattr(self, k, v)
def iteritems(self):
@ -84,7 +86,7 @@ class ModelBase(object):
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return local.iteritems()

View File

@ -256,7 +256,7 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from ceilometer.openstack.common.db import exception
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import timeutils

View File

@ -18,12 +18,29 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of paginate query."""
from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import exception
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__)
@ -85,11 +102,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
sort_dir_func = {
'asc': sqlalchemy.asc,
'desc': sqlalchemy.desc,
}[current_sort_dir]
try:
sort_dir_func = {
'asc': sqlalchemy.asc,
'desc': sqlalchemy.desc,
}[current_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
try:
sort_key_attr = getattr(model, current_sort_key)
except AttributeError:
@ -114,11 +134,8 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i]))
elif sort_dirs[i] == 'asc':
crit_attrs.append((model_attr > marker_values[i]))
else:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
crit_attrs.append((model_attr > marker_values[i]))
criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria)
@ -130,3 +147,342 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
query = query.limit(limit)
return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
metadata = MetaData()
metadata.bind = engine
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
except KeyError:
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise exception.OpenstackException(message=msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise exception.OpenstackException(message=msg % column_name)
return column
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
def _get_default_deleted_value(table):
if isinstance(table.c.id.type, Integer):
return 0
if isinstance(table.c.id.type, String):
return ""
raise exception.OpenstackException(
message=_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
table = get_table(migrate_engine, table_name)
insp = reflection.Inspector.from_engine(migrate_engine)
real_indexes = insp.get_indexes(table_name)
existing_index_names = dict(
[(index['name'], index['column_names']) for index in real_indexes])
# NOTE(boris-42): Restore indexes on `deleted` column
for index in indexes:
if 'deleted' not in index['column_names']:
continue
name = index['name']
if name in existing_index_names:
column_names = [table.c[c] for c in existing_index_names[name]]
old_index = Index(name, *column_names, unique=index["unique"])
old_index.drop(migrate_engine)
column_names = [table.c[c] for c in index['column_names']]
new_index = Index(index["name"], *column_names, unique=index["unique"])
new_index.create(migrate_engine)
def change_deleted_column_type_to_boolean(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_boolean_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
old_deleted = Column('old_deleted', Boolean, default=False)
old_deleted.create(table, populate_default=False)
table.update().\
where(table.c.deleted == table.c.id).\
values(old_deleted=True).\
execute()
table.c.deleted.drop()
table.c.old_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
**col_name_col_instance):
insp = reflection.Inspector.from_engine(migrate_engine)
table = get_table(migrate_engine, table_name)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', Boolean, default=0)
columns.append(column_copy)
constraints = [constraint.copy() for constraint in table.constraints]
meta = MetaData(bind=migrate_engine)
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
c_select = []
for c in table.c:
if c.name != "deleted":
c_select.append(c)
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
new_table.update().\
where(new_table.c.deleted == new_table.c.id).\
values(deleted=True).\
execute()
def change_deleted_column_type_to_id_type(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_id_type_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
new_deleted = Column('new_deleted', table.c.id.type,
default=_get_default_deleted_value(table))
new_deleted.create(table, populate_default=True)
deleted = True # workaround for pyflakes
table.update().\
where(table.c.deleted == deleted).\
values(new_deleted=table.c.id).\
execute()
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
**col_name_col_instance):
# NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
# constraints in sqlite DB and our `deleted` column has
# 2 check constraints. So there is only one way to remove
# these constraints:
# 1) Create new table with the same columns, constraints
# and indexes. (except deleted column).
# 2) Copy all data from old to new table.
# 3) Drop old table.
# 4) Rename new table to old table name.
insp = reflection.Inspector.from_engine(migrate_engine)
meta = MetaData(bind=migrate_engine)
table = Table(table_name, meta, autoload=True)
default_deleted_value = _get_default_deleted_value(table)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
def is_deleted_column_constraint(constraint):
# NOTE(boris-42): There is no other way to check is CheckConstraint
# associated with deleted column.
if not isinstance(constraint, CheckConstraint):
return False
sqltext = str(constraint.sqltext)
return (sqltext.endswith("deleted in (0, 1)") or
sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
constraints = []
for constraint in table.constraints:
if not is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
ins = InsertFromSelect(new_table, table.select())
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=new_table.c.id).\
execute()
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\
execute()

View File

@ -33,7 +33,7 @@ class Error(Exception):
class ApiError(Error):
def __init__(self, message='Unknown', code='Unknown'):
self.message = message
self.api_message = message
self.code = code
super(ApiError, self).__init__('%s: %s' % (code, message))
@ -44,19 +44,19 @@ class NotFound(Error):
class UnknownScheme(Error):
msg = "Unknown scheme '%s' found in URI"
msg_fmt = "Unknown scheme '%s' found in URI"
def __init__(self, scheme):
msg = self.__class__.msg % scheme
msg = self.msg_fmt % scheme
super(UnknownScheme, self).__init__(msg)
class BadStoreUri(Error):
msg = "The Store URI %s was malformed. Reason: %s"
msg_fmt = "The Store URI %s was malformed. Reason: %s"
def __init__(self, uri, reason):
msg = self.__class__.msg % (uri, reason)
msg = self.msg_fmt % (uri, reason)
super(BadStoreUri, self).__init__(msg)
@ -100,9 +100,7 @@ def wrap_exception(f):
return f(*args, **kw)
except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise
_wrap.func_name = f.func_name
@ -113,29 +111,29 @@ class OpenstackException(Exception):
"""Base Exception class.
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
a 'msg_fmt' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = "An unknown exception occurred"
msg_fmt = "An unknown exception occurred"
def __init__(self, **kwargs):
try:
self._error_string = self.message % kwargs
self._error_string = self.msg_fmt % kwargs
except Exception:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise
else:
# at least get the core message out if something happened
self._error_string = self.message
self._error_string = self.msg_fmt
def __str__(self):
return self._error_string
class MalformedRequestBody(OpenstackException):
message = "Malformed message body: %(reason)s"
msg_fmt = "Malformed message body: %(reason)s"
class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s"
msg_fmt = "Invalid content type %(content_type)s"

View File

@ -29,8 +29,6 @@ It also allows setting of formatting information through conf.
"""
import ConfigParser
import cStringIO
import inspect
import itertools
import logging
@ -41,6 +39,7 @@ import sys
import traceback
from oslo.config import cfg
from six import moves
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
@ -348,7 +347,7 @@ class LogConfigError(Exception):
def _load_log_config(log_config):
try:
logging.config.fileConfig(log_config)
except ConfigParser.Error as exc:
except moves.configparser.Error as exc:
raise LogConfigError(log_config, str(exc))
@ -521,7 +520,7 @@ class ContextFormatter(logging.Formatter):
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = cStringIO.StringIO()
stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')

View File

@ -13,12 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
import uuid
from oslo.config import cfg
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log as logging
@ -35,7 +36,7 @@ notifier_opts = [
default='INFO',
help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id',
default='$host',
default=None,
help='Default publisher_id for outgoing notifications'),
]
@ -74,7 +75,7 @@ def notify_decorator(name, fn):
ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
notify(ctxt,
CONF.default_publisher_id,
CONF.default_publisher_id or socket.gethostname(),
name,
CONF.default_notification_level,
body)
@ -84,7 +85,10 @@ def notify_decorator(name, fn):
def publisher_id(service, host=None):
if not host:
host = CONF.host
try:
host = CONF.host
except AttributeError:
host = CONF.default_publisher_id or socket.gethostname()
return "%s.%s" % (service, host)
@ -153,29 +157,16 @@ def _get_drivers():
if _drivers is None:
_drivers = {}
for notification_driver in CONF.notification_driver:
add_driver(notification_driver)
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
return _drivers.values()
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global _drivers

View File

@ -1,119 +0,0 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import log as logging
list_notifier_drivers_opt = cfg.MultiStrOpt(
'list_notifier_drivers',
default=['ceilometer.openstack.common.notifier.no_op_notifier'],
help='List of drivers to send notifications')
CONF = cfg.CONF
CONF.register_opt(list_notifier_drivers_opt)
LOG = logging.getLogger(__name__)
drivers = None
class ImportFailureNotifier(object):
"""Noisily re-raises some exception over-and-over when notify is called."""
def __init__(self, exception):
self.exception = exception
def notify(self, context, message):
raise self.exception
def _get_drivers():
"""Instantiates and returns drivers based on the flag values."""
global drivers
if drivers is None:
drivers = []
for notification_driver in CONF.list_notifier_drivers:
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
return drivers
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
else:
# Driver is already loaded; just add the object.
drivers.append(notification_driver)
def _object_name(obj):
name = []
if hasattr(obj, '__module__'):
name.append(obj.__module__)
if hasattr(obj, '__name__'):
name.append(obj.__name__)
else:
name.append(obj.__class__.__name__)
return '.'.join(name)
def remove_driver(notification_driver):
"""Remove a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
removed = False
if notification_driver in drivers:
# We're removing an object. Easy.
drivers.remove(notification_driver)
removed = True
else:
# We're removing a driver by name. Search for it.
for driver in drivers:
if _object_name(driver) == notification_driver:
drivers.remove(driver)
removed = True
if not removed:
raise ValueError("Cannot remove; %s is not in list" %
notification_driver)
def notify(context, message):
"""Passes notification to multiple notifiers in a list."""
for driver in _get_drivers():
try:
driver.notify(context, message)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification driver %(driver)s."), locals())
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global drivers
drivers = None

View File

@ -1,29 +0,0 @@
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.notifier import rpc_notifier
LOG = logging.getLogger(__name__)
def notify(context, message):
"""Deprecated in Grizzly. Please use rpc_notifier instead."""
LOG.deprecated(_("The rabbit_notifier is now deprecated."
" Please use rpc_notifier instead."))
rpc_notifier.notify(context, message)

View File

@ -16,7 +16,7 @@
from oslo.config import cfg
from ceilometer.openstack.common import context as req_context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc

View File

@ -18,7 +18,7 @@
from oslo.config import cfg
from ceilometer.openstack.common import context as req_context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc

View File

@ -750,7 +750,7 @@ def _parse_text_rule(rule):
return state.result
except ValueError:
# Couldn't parse the rule
LOG.exception(_("Failed to understand rule %(rule)r") % locals())
LOG.exception(_("Failed to understand rule %r") % rule)
# Fail closed
return FalseCheck()

View File

@ -1,247 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import os
import random
import shlex
import signal
from eventlet.green import subprocess
from eventlet import greenthread
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class InvalidArgumentError(Exception):
def __init__(self, message=None):
super(InvalidArgumentError, self).__init__(message)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
% (description, cmd, exit_code, stdout, stderr))
super(ProcessExecutionError, self).__init__(message)
class NoRootWrapSpecified(Exception):
def __init__(self, message=None):
super(NoRootWrapSpecified, self).__init__(message)
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
optional retry.
:param cmd: Passed to subprocess.Popen.
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless
program exits with one of these code.
:type check_exit_code: boolean, int, or [int]
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
:param attempts: How many times to retry cmd.
:type attempts: int
:param run_as_root: True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix to commands called with
run_as_root=True
:type root_helper: string
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
:raises: :class:`ProcessExecutionError`
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
shell = kwargs.pop('shell', False)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
'helper.'))
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
preexec_fn = None
close_fds = False
else:
preexec_fn = _subprocess_setup
close_fds = True
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=close_fds,
preexec_fn=preexec_fn,
shell=shell)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
def trycmd(*args, **kwargs):
"""
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the
command can be considered to have failed.
:discard_warnings True | False. Defaults to False. If set to True,
then for succeeding commands, stderr is cleared
"""
discard_warnings = kwargs.pop('discard_warnings', False)
try:
out, err = execute(*args, **kwargs)
failed = False
except ProcessExecutionError, exn:
out, err = '', str(exn)
failed = True
if not failed and discard_warnings and err:
# Handle commands that output to stderr but otherwise succeed
err = ''
return out, err
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
if process_input:
# This is (probably) fixable if we need it...
raise InvalidArgumentError(_('process_input not supported over SSH'))
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
channel = stdout_stream.channel
# NOTE(justinsb): This seems suspicious...
# ...other SSH clients have buffering issues with this approach
stdout = stdout_stream.read()
stderr = stderr_stream.read()
stdin_stream.close()
exit_status = channel.recv_exit_status()
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return (stdout, stderr)

View File

@ -29,7 +29,7 @@ import inspect
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging

View File

@ -34,14 +34,28 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
deprecated_group='DEFAULT',
help='Use durable queues in amqp.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
help='Auto-delete queues in amqp.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)

View File

@ -24,7 +24,7 @@ import traceback
from oslo.config import cfg
import six
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local
@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
message = self.message % kwargs
message = self.msg_fmt % kwargs
except Exception:
# kwargs doesn't match a variable in the message
@ -90,7 +90,7 @@ class RPCException(Exception):
for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
message = self.message
message = self.msg_fmt
super(RPCException, self).__init__(message)
@ -104,7 +104,7 @@ class RemoteError(RPCException):
contains all of the relevant info.
"""
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
@ -121,7 +121,7 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _('Timeout while waiting on RPC response - '
msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
@ -144,25 +144,25 @@ class Timeout(RPCException):
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException):
message = _("Specified RPC version, %(version)s, not supported by "
msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
@ -261,41 +261,20 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
has_token = 'auth_token' in msg_data
def _fix_passwords(d):
"""Sanitizes the password fields in the dictionary."""
for k in d.iterkeys():
if k.lower().find('password') != -1:
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
if not any([has_method, has_context_token, has_token]):
return log_func(msg, msg_data)
msg_data = copy.deepcopy(msg_data)
if has_method:
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
if has_token:
msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
def serialize_remote_exception(failure_info, log_failure=True):

View File

@ -18,7 +18,6 @@ import functools
import itertools
import socket
import ssl
import sys
import time
import uuid
@ -31,15 +30,19 @@ import kombu.messaging
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common.rpc import amqp as rpc_amqp
from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
help='SSL version to use (valid only if SSL enabled)'),
help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
@ -83,9 +86,6 @@ kombu_opts = [
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@ -257,9 +257,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@ -363,8 +363,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.rabbit_durable_queues,
'auto_delete': False,
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
@ -394,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
@ -478,7 +478,8 @@ class Connection(object):
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.conf.kombu_ssl_version
ssl_params['ssl_version'] = sslutils.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
@ -489,12 +490,8 @@ class Connection(object):
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
if not ssl_params:
# Just have the default behavior
return True
else:
# Return the extended behavior
return ssl_params
# Return the extended behavior or just have the default behavior
return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
@ -561,13 +558,11 @@ class Connection(object):
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
LOG.error(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
sys.exit(1)
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info
LOG.error(msg)
raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1

View File

@ -25,7 +25,7 @@ import greenlet
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log as logging
@ -181,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase):
@ -203,9 +208,14 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase):
@ -228,7 +238,7 @@ class FanoutConsumer(ConsumerBase):
{"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,

View File

@ -27,7 +27,7 @@ import greenlet
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common.rpc import common as rpc_common

View File

@ -23,7 +23,7 @@ import contextlib
import eventlet
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
@ -248,9 +248,7 @@ class DirectBinding(Binding):
that it maps directly to a host, thus direct.
"""
def test(self, key):
if '.' in key:
return True
return False
return '.' in key
class TopicBinding(Binding):
@ -262,17 +260,13 @@ class TopicBinding(Binding):
matches that of a direct exchange.
"""
def test(self, key):
if '.' not in key:
return True
return False
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
if key.startswith('fanout~'):
return True
return False
return key.startswith('fanout~')
class StubExchange(Exchange):

View File

@ -23,7 +23,7 @@ import json
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import matchmaker as mm
@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
return key in self.ring0
class RoundRobinRingExchange(RingExchange):

View File

@ -69,7 +69,7 @@ class RpcProxy(object):
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
@ -32,10 +32,11 @@ class Service(service.Service):
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None):
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
@ -48,7 +49,8 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)

View File

@ -81,6 +81,15 @@ class Launcher(object):
"""
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
@ -93,24 +102,31 @@ class ServiceLauncher(Launcher):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signo)
def wait(self):
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
@ -121,7 +137,16 @@ class ServiceLauncher(Launcher):
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status
return status, signo
def wait(self):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
return status
self.restart()
class ServiceWrapper(object):
@ -139,9 +164,12 @@ class ProcessLauncher(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
@ -150,6 +178,7 @@ class ProcessLauncher(object):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -160,16 +189,47 @@ class ProcessLauncher(object):
sys.exit(1)
def _child_process(self, service):
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = None
signo = 0
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
@ -184,7 +244,7 @@ class ProcessLauncher(object):
launcher = Launcher()
launcher.launch_service(service)
launcher.wait()
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@ -205,21 +265,13 @@ class ProcessLauncher(object):
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.service)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if signo != signal.SIGHUP:
break
launcher.restart()
os._exit(status)
@ -265,12 +317,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
@ -279,14 +326,30 @@ class ProcessLauncher(object):
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
@ -311,6 +374,10 @@ class Service(object):
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
@ -353,6 +420,13 @@ class Services(object):
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.

View File

@ -227,15 +227,7 @@
# Default publisher_id for outgoing notifications (string
# value)
#default_publisher_id=$host
#
# Options defined in ceilometer.openstack.common.notifier.list_notifier
#
# List of drivers to send notifications (multi valued)
#list_notifier_drivers=ceilometer.openstack.common.notifier.no_op_notifier
#default_publisher_id=<None>
#
@ -292,12 +284,24 @@
#control_exchange=openstack
#
# Options defined in ceilometer.openstack.common.rpc.amqp
#
# Use durable queues in amqp. (boolean value)
#amqp_durable_queues=false
# Auto-delete queues in amqp. (boolean value)
#amqp_auto_delete=false
#
# Options defined in ceilometer.openstack.common.rpc.impl_kombu
#
# SSL version to use (valid only if SSL enabled) (string
# value)
# SSL version to use (valid only if SSL enabled). valid values
# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
# distributions (string value)
#kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value)
@ -346,9 +350,6 @@
# value)
#rabbit_max_retries=0
# use durable queues in RabbitMQ (boolean value)
#rabbit_durable_queues=false
# use H/A queues in RabbitMQ (x-ha-policy: all).You need to
# wipe RabbitMQ database when changing this option. (boolean
# value)
@ -713,4 +714,3 @@
#password=<None>
# Total option count: 134

View File

@ -17,5 +17,5 @@ module=rpc
module=service
module=threadgroup
module=timeutils
module=config.generator
module=config
base=ceilometer

View File

@ -1,9 +0,0 @@
#!/bin/sh
TMPFILE=`mktemp`
trap "rm -f ${TMPFILE}" EXIT
tools/conf/generate_sample.sh "${TMPFILE}"
if ! diff "${TMPFILE}" etc/ceilometer/ceilometer.conf.sample
then
echo "E: ceilometer.conf.sample is not up to date, please run tools/conf/generate_sample.sh"
exit 42
fi

View File

@ -1,30 +0,0 @@
#!/usr/bin/env bash
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 SINA Corporation
# All Rights Reserved.
# Author: Zhongyue Luo <lzyeval@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
OS_VARS=$(set | sed -n '/^OS_/s/=[^=*]*$//gp' | xargs)
[ "$OS_VARS" ] && eval "unset \$OS_VARS"
FILES=$(find ceilometer -type f -name "*.py" ! -path "ceilometer/tests/*" -exec \
grep -l "Opt(" {} \; | sort -u)
DEST=${1:-etc/ceilometer/ceilometer.conf.sample}
PYTHONPATH=./:${PYTHONPATH} \
python $(dirname "$0")/../../ceilometer/openstack/common/config/generator.py ${FILES} > $DEST

9
tools/config/check_uptodate.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/sh
TEMPDIR=`mktemp -d`
CFGFILE=ceilometer.conf.sample
tools/config/generate_sample.sh -b ./ -p ceilometer -o $TEMPDIR
if ! diff $TEMPDIR/$CFGFILE etc/ceilometer/$CFGFILE
then
echo "E: ceilometer.conf.sample is not up to date, please run tools/config/generate_sample.sh"
exit 42
fi

69
tools/config/generate_sample.sh Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
print_hint() {
echo "Try \`${0##*/} --help' for more information." >&2
}
PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \
--long help,base-dir:,package-name:,output-dir: -- "$@")
if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
eval set -- "$PARSED_OPTIONS"
while true; do
case "$1" in
-h|--help)
echo "${0##*/} [options]"
echo ""
echo "options:"
echo "-h, --help show brief help"
echo "-b, --base-dir=DIR Project base directory (required)"
echo "-p, --package-name=NAME Project package name"
echo "-o, --output-dir=DIR File output directory"
exit 0
;;
-b|--base-dir)
shift
BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
-p|--package-name)
shift
PACKAGENAME=`echo $1`
shift
;;
-o|--output-dir)
shift
OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
--)
break
;;
esac
done
if [ -z $BASEDIR ] || ! [ -d $BASEDIR ]
then
echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
fi
PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}}
OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
if ! [ -d $OUTPUTDIR ]
then
echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
exit 1
fi
BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \
-exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
export EVENTLET_NO_GREENDNS=yes
MODULEPATH=ceilometer.openstack.common.config.generator
OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
python -m $MODULEPATH $FILES > $OUTPUTFILE

View File

@ -8,7 +8,7 @@ setenv = VIRTUAL_ENV={envdir}
EVENTLET_NO_GREENDNS=yes
commands =
bash -x {toxinidir}/run-tests.sh {posargs}
{toxinidir}/tools/conf/check_uptodate.sh
{toxinidir}/tools/config/check_uptodate.sh
sitepackages = False
downloadcache = {toxworkdir}/_download