Merge "Update oslo, use new configuration generator"

This commit is contained in:
Jenkins 2013-05-29 00:19:36 +00:00 committed by Gerrit Code Review
commit 57eab05229
27 changed files with 995 additions and 285 deletions

View File

@ -18,10 +18,8 @@
#
# @author: Zhongyue Luo, SINA Corporation.
#
"""Extracts OpenStack config option info from module(s)."""
import gettext
import imp
import os
import re
@ -31,9 +29,10 @@ import textwrap
from oslo.config import cfg
from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common import importutils
gettext.install('ceilometer', unicode=1)
gettextutils.install('ceilometer')
STROPT = "StrOpt"
BOOLOPT = "BoolOpt"
@ -61,7 +60,7 @@ BASEDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
WORDWRAP_WIDTH = 60
def main(srcfiles):
def generate(srcfiles):
mods_by_pkg = dict()
for filepath in srcfiles:
pkg_name = filepath.split(os.sep)[1]
@ -107,12 +106,10 @@ def _import_module(mod_str):
return sys.modules[mod_str[4:]]
else:
return importutils.import_module(mod_str)
except (ValueError, AttributeError), err:
return None
except ImportError, ie:
except ImportError as ie:
sys.stderr.write("%s\n" % str(ie))
return None
except Exception, e:
except Exception:
return None
@ -135,7 +132,11 @@ def _guess_groups(opt, mod_obj):
if _is_in_group(opt, value._group):
return value._group.name
raise RuntimeError("Unable to find group for option %s" % opt.name)
raise RuntimeError(
"Unable to find group for option %s, "
"maybe it's defined twice in the same group?"
% opt.name
)
def _list_opts(obj):
@ -193,7 +194,7 @@ def _sanitize_default(s):
elif s == _get_my_ip():
return '10.0.0.1'
elif s == socket.getfqdn():
return 'nova'
return 'ceilometer'
elif s.strip() != s:
return '"%s"' % s
return s
@ -242,8 +243,11 @@ def _print_opt(opt):
sys.exit(1)
if __name__ == '__main__':
def main():
if len(sys.argv) < 2:
print "usage: python %s [srcfile]...\n" % sys.argv[0]
print "usage: %s [srcfile]...\n" % sys.argv[0]
sys.exit(0)
main(sys.argv[1:])
generate(sys.argv[1:])
if __name__ == '__main__':
main()

View File

@ -19,8 +19,9 @@
Supported configuration options:
`db_backend`: DB backend name or full module path to DB backend module.
`dbapi_use_tpool`: Enable thread pooling of DB API calls.
The following two parameters are in the 'database' group:
`backend`: DB backend name or full module path to DB backend module.
`use_tpool`: Enable thread pooling of DB API calls.
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
@ -44,17 +45,21 @@ from ceilometer.openstack.common import lockutils
db_opts = [
cfg.StrOpt('db_backend',
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.BoolOpt('dbapi_use_tpool',
cfg.BoolOpt('use_tpool',
default=False,
deprecated_name='dbapi_use_tpool',
deprecated_group='DEFAULT',
help='Enable the experimental use of thread pooling for '
'all DB API calls')
]
CONF = cfg.CONF
CONF.register_opts(db_opts)
CONF.register_opts(db_opts, 'database')
class DBAPI(object):
@ -75,8 +80,8 @@ class DBAPI(object):
if self.__backend:
# Another thread assigned it
return self.__backend
backend_name = CONF.db_backend
self.__use_tpool = CONF.dbapi_use_tpool
backend_name = CONF.database.backend
self.__use_tpool = CONF.database.use_tpool
if self.__use_tpool:
from eventlet import tpool
self.__tpool = tpool

View File

@ -33,9 +33,6 @@ from ceilometer.openstack.common import timeutils
class ModelBase(object):
"""Base class for models."""
__table_initialized__ = False
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
metadata = None
def save(self, session=None):
"""Save this object."""
@ -92,6 +89,11 @@ class ModelBase(object):
return local.iteritems()
class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
class SoftDeleteMixin(object):
deleted_at = Column(DateTime)
deleted = Column(Integer, default=0)

View File

@ -247,8 +247,10 @@ import time
from eventlet import greenthread
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
import sqlalchemy.interfaces
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
@ -258,53 +260,76 @@ from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import timeutils
DEFAULT = 'DEFAULT'
sql_opts = [
cfg.StrOpt('sql_connection',
sqlite_db_opts = [
cfg.StrOpt('sqlite_db',
default='ceilometer.sqlite',
help='the filename to use with sqlite'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If true, use synchronous mode for sqlite'),
]
database_opts = [
cfg.StrOpt('connection',
default='sqlite:///' +
os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
deprecated_name='sql_connection',
deprecated_group=DEFAULT,
secret=True),
cfg.StrOpt('sqlite_db',
default='ceilometer.sqlite',
help='the filename to use with sqlite'),
cfg.IntOpt('sql_idle_timeout',
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_name='sql_idle_timeout',
deprecated_group=DEFAULT,
help='timeout before idle sql connections are reaped'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If passed, use synchronous mode for sqlite'),
cfg.IntOpt('sql_min_pool_size',
cfg.IntOpt('min_pool_size',
default=1,
deprecated_name='sql_min_pool_size',
deprecated_group=DEFAULT,
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('sql_max_pool_size',
cfg.IntOpt('max_pool_size',
default=5,
deprecated_name='sql_max_pool_size',
deprecated_group=DEFAULT,
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('sql_max_retries',
cfg.IntOpt('max_retries',
default=10,
deprecated_name='sql_max_retries',
deprecated_group=DEFAULT,
help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('sql_retry_interval',
cfg.IntOpt('retry_interval',
default=10,
deprecated_name='sql_retry_interval',
deprecated_group=DEFAULT,
help='interval between retries of opening a sql connection'),
cfg.IntOpt('sql_max_overflow',
cfg.IntOpt('max_overflow',
default=None,
deprecated_name='sql_max_overflow',
deprecated_group=DEFAULT,
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('sql_connection_debug',
cfg.IntOpt('connection_debug',
default=0,
deprecated_name='sql_connection_debug',
deprecated_group=DEFAULT,
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('sql_connection_trace',
cfg.BoolOpt('connection_trace',
default=False,
deprecated_name='sql_connection_trace',
deprecated_group=DEFAULT,
help='Add python stack traces to SQL as comment strings'),
]
CONF = cfg.CONF
CONF.register_opts(sql_opts)
CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__)
_ENGINE = None
@ -313,17 +338,42 @@ _MAKER = None
def set_defaults(sql_connection, sqlite_db):
"""Set defaults for configuration variables."""
cfg.set_defaults(sql_opts,
sql_connection=sql_connection,
cfg.set_defaults(database_opts,
connection=sql_connection)
cfg.set_defaults(sqlite_db_opts,
sqlite_db=sqlite_db)
def get_session(autocommit=True, expire_on_commit=False):
def cleanup():
global _ENGINE, _MAKER
if _MAKER:
_MAKER.close_all()
_MAKER = None
if _ENGINE:
_ENGINE.dispose()
_ENGINE = None
class SqliteForeignKeysListener(PoolListener):
"""
Ensures that the foreign key constraints are enforced in SQLite.
The foreign key constraints are disabled by default in SQLite,
so the foreign key constraints will be enabled here for every
database connection
"""
def connect(self, dbapi_con, con_record):
dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False,
sqlite_fk=False):
"""Return a SQLAlchemy session."""
global _MAKER
if _MAKER is None:
engine = get_engine()
engine = get_engine(sqlite_fk=sqlite_fk)
_MAKER = get_maker(engine, autocommit, expire_on_commit)
session = _MAKER()
@ -355,21 +405,22 @@ _DUP_KEY_RE_DB = {
}
def raise_if_duplicate_entry_error(integrity_error, engine_name):
def _raise_if_duplicate_entry_error(integrity_error, engine_name):
"""
In this function will be raised DBDuplicateEntry exception if integrity
error wrap unique constraint violation.
"""
def get_columns_from_uniq_cons_or_name(columns):
# note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3"
# means that columns c1, c2, c3 are in UniqueConstraint.
# note(vsergeyev): UniqueConstraint name convention: "uniq_t$c1$c2"
# where `t` it is table name and columns `c1`, `c2`
# are in UniqueConstraint.
uniqbase = "uniq_"
if not columns.startswith(uniqbase):
if engine_name == "postgresql":
return [columns[columns.index("_") + 1:columns.rindex("_")]]
return [columns]
return columns[len(uniqbase):].split("_x_")
return columns[len(uniqbase):].split("$")[1:]
if engine_name not in ["mysql", "sqlite", "postgresql"]:
return
@ -397,7 +448,7 @@ _DEADLOCK_RE_DB = {
}
def raise_if_deadlock_error(operational_error, engine_name):
def _raise_if_deadlock_error(operational_error, engine_name):
"""
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
@ -411,7 +462,7 @@ def raise_if_deadlock_error(operational_error, engine_name):
raise exception.DBDeadlock(operational_error)
def wrap_db_error(f):
def _wrap_db_error(f):
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
@ -420,49 +471,50 @@ def wrap_db_error(f):
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.OperationalError, e:
raise_if_deadlock_error(e, get_engine().name)
except sqla_exc.OperationalError as e:
_raise_if_deadlock_error(e, get_engine().name)
# NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now.
raise
except sqla_exc.IntegrityError, e:
except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example
# instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate
# unique constraint, from error message.
raise_if_duplicate_entry_error(e, get_engine().name)
_raise_if_duplicate_entry_error(e, get_engine().name)
raise exception.DBError(e)
except Exception, e:
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap
def get_engine():
def get_engine(sqlite_fk=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
_ENGINE = create_engine(CONF.sql_connection)
_ENGINE = create_engine(CONF.database.connection,
sqlite_fk=sqlite_fk)
return _ENGINE
def synchronous_switch_listener(dbapi_conn, connection_rec):
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def add_regexp_listener(dbapi_con, con_record):
def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(unicode(item)) is not None
return reg.search(six.text_type(item)) is not None
dbapi_con.create_function('regexp', 2, regexp)
def greenthread_yield(dbapi_con, con_record):
def _greenthread_yield(dbapi_con, con_record):
"""
Ensure other greenthreads get a chance to execute by forcing a context
switch. With common database backends (eg MySQLdb and sqlite), there is
@ -472,7 +524,7 @@ def greenthread_yield(dbapi_con, con_record):
greenthread.sleep(0)
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
@ -482,7 +534,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex:
except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away")
@ -490,7 +542,7 @@ def ping_listener(dbapi_conn, connection_rec, connection_proxy):
raise
def is_db_connection_error(args):
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
@ -501,56 +553,58 @@ def is_db_connection_error(args):
return False
def create_engine(sql_connection):
def create_engine(sql_connection, sqlite_fk=False):
"""Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": CONF.sql_idle_timeout,
"pool_recycle": CONF.database.idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
if CONF.sql_connection_debug >= 100:
if CONF.database.connection_debug >= 100:
engine_args['echo'] = 'debug'
elif CONF.sql_connection_debug >= 50:
elif CONF.database.connection_debug >= 50:
engine_args['echo'] = True
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
if CONF.sql_connection == "sqlite://":
if CONF.database.connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
engine_args['pool_size'] = CONF.sql_max_pool_size
if CONF.sql_max_overflow is not None:
engine_args['max_overflow'] = CONF.sql_max_overflow
engine_args['pool_size'] = CONF.database.max_pool_size
if CONF.database.max_overflow is not None:
engine_args['max_overflow'] = CONF.database.max_overflow
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', greenthread_yield)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', ping_listener)
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', add_regexp_listener)
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if (CONF.sql_connection_trace and
if (CONF.database.connection_trace and
engine.dialect.dbapi.__name__ == 'MySQLdb'):
patch_mysqldb_with_stacktrace_comments()
_patch_mysqldb_with_stacktrace_comments()
try:
engine.connect()
except sqla_exc.OperationalError, e:
if not is_db_connection_error(e.args[0]):
except sqla_exc.OperationalError as e:
if not _is_db_connection_error(e.args[0]):
raise
remaining = CONF.sql_max_retries
remaining = CONF.database.max_retries
if remaining == -1:
remaining = 'infinite'
while True:
@ -558,13 +612,13 @@ def create_engine(sql_connection):
LOG.warn(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(CONF.sql_retry_interval)
time.sleep(CONF.database.retry_interval)
try:
engine.connect()
break
except sqla_exc.OperationalError, e:
except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \
not is_db_connection_error(e.args[0]):
not _is_db_connection_error(e.args[0]):
raise
return engine
@ -580,15 +634,15 @@ class Query(sqlalchemy.orm.query.Query):
class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching."""
@wrap_db_error
@_wrap_db_error
def query(self, *args, **kwargs):
return super(Session, self).query(*args, **kwargs)
@wrap_db_error
@_wrap_db_error
def flush(self, *args, **kwargs):
return super(Session, self).flush(*args, **kwargs)
@wrap_db_error
@_wrap_db_error
def execute(self, *args, **kwargs):
return super(Session, self).execute(*args, **kwargs)
@ -602,7 +656,7 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
query_cls=Query)
def patch_mysqldb_with_stacktrace_comments():
def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries by patching
MySQLdb.cursors.BaseCursor._do_query.
"""

View File

@ -105,9 +105,9 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Build up an array of sort criteria as in the docstring
criteria_list = []
for i in xrange(0, len(sort_keys)):
for i in range(0, len(sort_keys)):
crit_attrs = []
for j in xrange(0, i):
for j in range(0, i):
model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j]))

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import gc
import pprint
import sys
@ -37,7 +39,7 @@ CONF.register_opts(eventlet_backdoor_opts)
def _dont_use_this():
print "Don't use this, just disconnect instead"
print("Don't use this, just disconnect instead")
def _find_objects(t):
@ -46,16 +48,16 @@ def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
print(i, gt)
traceback.print_stack(gt.gr_frame)
print
print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print threadId
print(threadId)
traceback.print_stack(stack)
print
print()
def initialize_if_enabled():

View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import os
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise

View File

@ -41,6 +41,8 @@ import json
import types
import xmlrpclib
import six
from ceilometer.openstack.common import timeutils
@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# value of itertools.count doesn't get caught by nasty_type_tests
# and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
return unicode(value)
return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
return recursive(value.__dict__, level=level + 1)
else:
if any(test(value) for test in _nasty_type_tests):
return unicode(value)
return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)
return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):

View File

@ -0,0 +1,278 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import functools
import os
import shutil
import tempfile
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from ceilometer.openstack.common import fileutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
help=('Directory to use for lock files. Default to a '
'temp directory'))
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then CONF.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -43,12 +43,11 @@ import traceback
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local
from ceilometer.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
@ -73,11 +72,13 @@ logging_cli_opts = [
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT,
default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
@ -321,17 +322,6 @@ class JSONFormatter(logging.Formatter):
return jsonutils.dumps(message)
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('ceilometer.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.msg))
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
@ -427,15 +417,22 @@ def _setup_logging_from_conf():
log_root.addHandler(streamlog)
if CONF.publish_errors:
log_root.addHandler(PublishErrorsHandler(logging.ERROR))
handler = importutils.import_object(
"ceilometer.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler)
for handler in log_root.handlers:
datefmt = CONF.log_date_format
for handler in log_root.handlers:
# NOTE(alaski): CONF.log_format overrides everything currently. This
# should be deprecated in favor of context aware formatting.
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
log_root.info('Deprecated: log_format is now deprecated and will '
'be removed in the next release')
else:
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)
@ -481,7 +478,7 @@ class WritableLogger(object):
self.logger.log(self.level, msg)
class LegacyFormatter(logging.Formatter):
class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string

View File

@ -60,6 +60,7 @@ import abc
import re
import urllib
import six
import urllib2
from ceilometer.openstack.common.gettextutils import _
@ -436,7 +437,7 @@ def _parse_list_rule(rule):
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
if len(or_list) == 0:
if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
@ -775,5 +776,5 @@ class GenericCheck(Check):
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
if self.kind in creds:
return match == unicode(creds[self.kind])
return match == six.text_type(creds[self.kind])
return False

View File

@ -19,8 +19,10 @@
System-level utilities and helper functions.
"""
import os
import random
import shlex
import signal
from eventlet.green import subprocess
from eventlet import greenthread
@ -32,6 +34,11 @@ from ceilometer.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class InvalidArgumentError(Exception):
def __init__(self, message=None):
super(InvalidArgumentError, self).__init__(message)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
@ -40,6 +47,12 @@ class UnknownArgumentError(Exception):
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
@ -49,6 +62,17 @@ class ProcessExecutionError(Exception):
super(ProcessExecutionError, self).__init__(message)
class NoRootWrapSpecified(Exception):
def __init__(self, message=None):
super(NoRootWrapSpecified, self).__init__(message)
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
@ -58,11 +82,11 @@ def execute(*cmd, **kwargs):
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Defaults to 0. Will raise
:class:`ProcessExecutionError`
if the command exits without returning this value
as a returncode
:type check_exit_code: int
:param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless
program exits with one of these code.
:type check_exit_code: boolean, int, or [int]
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
@ -72,8 +96,12 @@ def execute(*cmd, **kwargs):
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix all cmd's with
:param root_helper: command to prefix to commands called with
run_as_root=True
:type root_helper: string
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
@ -81,16 +109,31 @@ def execute(*cmd, **kwargs):
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
shell = kwargs.pop('shell', False)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if kwargs:
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
if run_as_root and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
'helper.'))
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
@ -98,11 +141,21 @@ def execute(*cmd, **kwargs):
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
preexec_fn = None
close_fds = False
else:
preexec_fn = _subprocess_setup
close_fds = True
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=True)
close_fds=close_fds,
preexec_fn=preexec_fn,
shell=shell)
result = None
if process_input is not None:
result = obj.communicate(process_input)
@ -112,9 +165,7 @@ def execute(*cmd, **kwargs):
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
@ -133,3 +184,64 @@ def execute(*cmd, **kwargs):
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
def trycmd(*args, **kwargs):
"""
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the
command can be considered to have failed.
:discard_warnings True | False. Defaults to False. If set to True,
then for succeeding commands, stderr is cleared
"""
discard_warnings = kwargs.pop('discard_warnings', False)
try:
out, err = execute(*args, **kwargs)
failed = False
except ProcessExecutionError, exn:
out, err = '', str(exn)
failed = True
if not failed and discard_warnings and err:
# Handle commands that output to stderr but otherwise succeed
err = ''
return out, err
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd)
if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH'))
if process_input:
# This is (probably) fixable if we need it...
raise InvalidArgumentError(_('process_input not supported over SSH'))
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
channel = stdout_stream.channel
# NOTE(justinsb): This seems suspicious...
# ...other SSH clients have buffering issues with this approach
stdout = stdout_stream.read()
stderr = stderr_stream.read()
stdin_stream.close()
exit_status = channel.recv_exit_status()
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status)
if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return (stdout, stderr)

View File

@ -197,8 +197,9 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %s'
', message : %s') % (msg_id, message_data))
LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
else:
waiter.put(message_data)

View File

@ -22,6 +22,7 @@ import sys
import traceback
from oslo.config import cfg
import six
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
@ -157,6 +158,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@ -299,7 +304,8 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(_("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)
kwargs = {}
@ -309,7 +315,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
'message': unicode(failure),
'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs

View File

@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
"""
from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
@ -93,16 +94,38 @@ class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
def __init__(self, callbacks):
def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
:param serializer: The Serializer object that will be used to
deserialize arguments before the method call and
to serialize the result after it returns.
"""
self.callbacks = callbacks
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcDispatcher, self).__init__()
def _deserialize_args(self, context, kwargs):
"""Helper method called to deserialize args before dispatch.
This calls our serializer on each argument, returning a new set of
args that have been deserialized.
:param context: The request context
:param kwargs: The arguments to be deserialized
:returns: A new set of deserialized args
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
@ -145,7 +168,9 @@ class RpcDispatcher(object):
if not hasattr(proxyobj, method):
continue
if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
kwargs = self._deserialize_args(ctxt, kwargs)
result = getattr(proxyobj, method)(ctxt, **kwargs)
return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)

View File

@ -331,15 +331,16 @@ class Connection(object):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
attempt = 0
delay = 1
while True:
# Close the session if necessary
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
attempt = 0
delay = 1
while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
@ -374,7 +375,7 @@ class Connection(object):
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e:
qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()

View File

@ -180,7 +180,7 @@ class ZmqSocket(object):
return
# We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0:
if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@ -763,7 +763,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if len(queues) == 0:
if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
@ -846,6 +846,11 @@ def _get_ctxt():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
matchmaker = importutils.import_object(
CONF.rpc_zmq_matchmaker, *args, **kwargs)
mm = CONF.rpc_zmq_matchmaker
if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker

View File

@ -19,8 +19,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import itertools
import json
import eventlet
from oslo.config import cfg
@ -30,10 +28,6 @@ from ceilometer.openstack.common import log as logging
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
@ -236,7 +230,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""
@ -245,7 +240,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if len(self.hosts) == 0:
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
@ -304,67 +299,6 @@ class StubExchange(Exchange):
return [(key, None)]
class RingExchange(Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
@ -388,17 +322,6 @@ class DirectExchange(Exchange):
return [(key, e)]
class MatchMakerRing(MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
class MatchMakerLocalhost(MatchMakerBase):
"""
Match Maker where all bare topics resolve to localhost.

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import matchmaker as mm
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -23,6 +23,8 @@ For more information about rpc API version numbers, see:
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
@ -34,16 +36,28 @@ class RpcProxy(object):
rpc API.
"""
def __init__(self, topic, default_version):
# The default namespace, which can be overriden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,
serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
:param version_cap: Optionally cap the maximum version used for sent
messages.
:param serializer: Optionaly (de-)serialize entities with a
provided helper.
"""
self.topic = topic
self.default_version = default_version
self.version_cap = version_cap
if serializer is None:
serializer = rpc_serializer.NoOpSerializer()
self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
@ -52,7 +66,11 @@ class RpcProxy(object):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
msg['version'] = vers if vers else self.default_version
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
@ -62,9 +80,25 @@ class RpcProxy(object):
def make_namespaced_msg(method, namespace, **kwargs):
return {'method': method, 'namespace': namespace, 'args': kwargs}
@staticmethod
def make_msg(method, **kwargs):
return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def make_msg(self, method, **kwargs):
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
**kwargs)
def _serialize_msg_args(self, context, kwargs):
"""Helper method called to serialize message arguments.
This calls our serializer on each argument, returning a new
set of args that have been serialized.
:param context: The request context
:param kwargs: The arguments to serialize
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
for argname, arg in kwargs.iteritems():
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
@ -81,9 +115,11 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
return rpc.call(context, real_topic, msg, timeout)
result = rpc.call(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@ -104,9 +140,11 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
real_topic = self._get_topic(topic)
try:
return rpc.multicall(context, real_topic, msg, timeout)
result = rpc.multicall(context, real_topic, msg, timeout)
return self.serializer.deserialize_entity(context, result)
except rpc.common.Timeout as exc:
raise rpc.common.Timeout(
exc.info, real_topic, msg.get('method'))
@ -124,6 +162,7 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@ -139,6 +178,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@ -157,6 +197,7 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@ -175,5 +216,6 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)

View File

@ -0,0 +1,52 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides the definition of an RPC serialization handler"""
import abc
class Serializer(object):
"""Generic (de-)serialization definition base class"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):
"""Serialize something to primitive form.
:param context: Security context
:param entity: Entity to be serialized
:returns: Serialized form of entity
"""
pass
@abc.abstractmethod
def deserialize_entity(self, context, entity):
"""Deserialize something from primitive form.
:param context: Security context
:param entity: Primitive to be deserialized
:returns: Deserialized form of entity
"""
pass
class NoOpSerializer(Serializer):
"""A serializer that does nothing"""
def serialize_entity(self, context, entity):
return entity
def deserialize_entity(self, context, entity):
return entity

View File

@ -52,7 +52,7 @@ class Launcher(object):
"""
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
@ -72,6 +72,7 @@ class Launcher(object):
:returns: None
"""
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
def stop(self):

View File

@ -61,6 +61,13 @@ class ThreadGroup(object):
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)

View File

@ -135,48 +135,12 @@
# Options defined in ceilometer.openstack.common.db.sqlalchemy.session
#
# The SQLAlchemy connection string used to connect to the
# database (string value)
#sql_connection=sqlite:////ceilometer/openstack/common/db/$sqlite_db
# the filename to use with sqlite (string value)
#sqlite_db=ceilometer.sqlite
# timeout before idle sql connections are reaped (integer
# value)
#sql_idle_timeout=3600
# If passed, use synchronous mode for sqlite (boolean value)
# If true, use synchronous mode for sqlite (boolean value)
#sqlite_synchronous=true
# Minimum number of SQL connections to keep open in a pool
# (integer value)
#sql_min_pool_size=1
# Maximum number of SQL connections to keep open in a pool
# (integer value)
#sql_max_pool_size=5
# maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
#sql_max_retries=10
# interval between retries of opening a sql connection
# (integer value)
#sql_retry_interval=10
# If set, use this value for max_overflow with sqlalchemy
# (integer value)
#sql_max_overflow=<None>
# Verbosity of SQL debugging information. 0=None,
# 100=Everything (integer value)
#sql_connection_debug=0
# Add python stack traces to SQL as comment strings (boolean
# value)
#sql_connection_trace=false
#
# Options defined in ceilometer.openstack.common.eventlet_backdoor
@ -186,6 +150,18 @@
#backdoor_port=<None>
#
# Options defined in ceilometer.openstack.common.lockutils
#
# Whether to disable inter-process locks (boolean value)
#disable_process_locking=false
# Directory to use for lock files. Default to a temp directory
# (string value)
#lock_path=<None>
#
# Options defined in ceilometer.openstack.common.log
#
@ -242,9 +218,11 @@
#log_config=<None>
# A logging.Formatter log message format string which may use
# any of the available logging.LogRecord attributes. Default:
# %(default)s (string value)
#log_format=%(asctime)s %(levelname)8s [%(name)s] %(message)s
# any of the available logging.LogRecord attributes. This
# option is deprecated. Please use
# logging_context_format_string and
# logging_default_format_string instead. (string value)
#log_format=<None>
# Format string for %%(asctime)s in log records. Default:
# %(default)s (string value)
@ -466,16 +444,13 @@
# Name of this node. Must be a valid hostname, FQDN, or IP
# address. Must match "host" option, if running Nova. (string
# value)
#rpc_zmq_host=nova
#rpc_zmq_host=dex
#
# Options defined in ceilometer.openstack.common.rpc.matchmaker
#
# Matchmaker ring file (JSON) (string value)
#matchmaker_ringfile=/etc/nova/matchmaker_ring.json
# Heartbeat frequency (integer value)
#matchmaker_heartbeat_freq=300
@ -534,6 +509,61 @@
#port=4952
[database]
#
# Options defined in ceilometer.openstack.common.db.api
#
# The backend to use for db (string value)
#backend=sqlalchemy
# Enable the experimental use of thread pooling for all DB API
# calls (boolean value)
#use_tpool=false
#
# Options defined in ceilometer.openstack.common.db.sqlalchemy.session
#
# The SQLAlchemy connection string used to connect to the
# database (string value)
#connection=sqlite:////common/db/$sqlite_db
# timeout before idle sql connections are reaped (integer
# value)
#idle_timeout=3600
# Minimum number of SQL connections to keep open in a pool
# (integer value)
#min_pool_size=1
# Maximum number of SQL connections to keep open in a pool
# (integer value)
#max_pool_size=5
# maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
#max_retries=10
# interval between retries of opening a sql connection
# (integer value)
#retry_interval=10
# If set, use this value for max_overflow with sqlalchemy
# (integer value)
#max_overflow=<None>
# Verbosity of SQL debugging information. 0=None,
# 100=Everything (integer value)
#connection_debug=0
# Add python stack traces to SQL as comment strings (boolean
# value)
#connection_trace=false
[publisher_meter]
#
@ -591,4 +621,14 @@
#udp_port=4952
# Total option count: 115
[matchmaker_ring]
#
# Options defined in ceilometer.openstack.common.rpc.matchmaker_ring
#
# Matchmaker ring file (JSON) (string value)
#ringfile=/etc/oslo/matchmaker_ring.json
# Total option count: 119

View File

@ -17,4 +17,5 @@ module=rpc
module=service
module=threadgroup
module=timeutils
module=config
base=ceilometer

View File

@ -21,5 +21,5 @@ FILES=$(find ceilometer -type f -name "*.py" ! -path "ceilometer/tests/*" -exec
grep -l "Opt(" {} \; | sort -u)
PYTHONPATH=./:${PYTHONPATH} \
python $(dirname "$0")/extract_opts.py ${FILES} > \
python $(dirname "$0")/../../ceilometer/openstack/common/config/generator.py ${FILES} > \
etc/ceilometer/ceilometer.conf.sample