diff --git a/ceilometer/openstack/common/context.py b/ceilometer/openstack/common/context.py index f5ac0336c..fe7847b20 100644 --- a/ceilometer/openstack/common/context.py +++ b/ceilometer/openstack/common/context.py @@ -40,13 +40,15 @@ class RequestContext(object): """ def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, - read_only=False, show_deleted=False, request_id=None): + read_only=False, show_deleted=False, request_id=None, + instance_uuid=None): self.auth_token = auth_token self.user = user self.tenant = tenant self.is_admin = is_admin self.read_only = read_only self.show_deleted = show_deleted + self.instance_uuid = instance_uuid if not request_id: request_id = generate_request_id() self.request_id = request_id @@ -58,7 +60,8 @@ class RequestContext(object): 'read_only': self.read_only, 'show_deleted': self.show_deleted, 'auth_token': self.auth_token, - 'request_id': self.request_id} + 'request_id': self.request_id, + 'instance_uuid': self.instance_uuid} def get_admin_context(show_deleted=False): diff --git a/ceilometer/openstack/common/db/exception.py b/ceilometer/openstack/common/db/exception.py index 60713e533..81bab2d25 100644 --- a/ceilometer/openstack/common/db/exception.py +++ b/ceilometer/openstack/common/db/exception.py @@ -43,3 +43,9 @@ class DBDeadlock(DBError): class DBInvalidUnicodeParameter(Exception): message = _("Invalid Parameter: " "Unicode is not supported by the current database.") + + +class DbMigrationError(DBError): + """Wraps migration specific exception.""" + def __init__(self, message=None): + super(DbMigrationError, self).__init__(str(message)) diff --git a/ceilometer/openstack/common/db/sqlalchemy/migration.py b/ceilometer/openstack/common/db/sqlalchemy/migration.py index e643d8e45..5cbe4e0e0 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/migration.py +++ b/ceilometer/openstack/common/db/sqlalchemy/migration.py @@ -38,12 +38,53 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +import distutils.version as dist_version +import os import re +import migrate from migrate.changeset import ansisql from migrate.changeset.databases import sqlite +from migrate.versioning import util as migrate_util +import sqlalchemy from sqlalchemy.schema import UniqueConstraint +from ceilometer.openstack.common.db import exception +from ceilometer.openstack.common.db.sqlalchemy import session as db_session +from ceilometer.openstack.common.gettextutils import _ # noqa + + +@migrate_util.decorator +def patched_with_engine(f, *a, **kw): + url = a[0] + engine = migrate_util.construct_engine(url, **kw) + + try: + kw['engine'] = engine + return f(*a, **kw) + finally: + if isinstance(engine, migrate_util.Engine) and engine is not url: + migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine) + engine.dispose() + + +# TODO(jkoelker) When migrate 0.7.3 is released and nova depends +# on that version or higher, this can be removed +MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3') +if (not hasattr(migrate, '__version__') or + dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION): + migrate_util.with_engine = patched_with_engine + + +# NOTE(jkoelker) Delay importing migrate until we are patched +from migrate import exceptions as versioning_exceptions +from migrate.versioning import api as versioning_api +from migrate.versioning.repository import Repository + +_REPOSITORY = None + +get_engine = db_session.get_engine + def _get_unique_constraints(self, table): """Retrieve information about existing unique constraints of the table @@ -157,3 +198,81 @@ def patch_migrate(): _visit_migrate_unique_constraint constraint_cls.__bases__ = (ansisql.ANSIColumnDropper, sqlite.SQLiteConstraintGenerator) + + +def db_sync(abs_path, version=None, init_version=0): + """Upgrade or downgrade a database. + + Function runs the upgrade() or downgrade() functions in change scripts. + + :param abs_path: Absolute path to migrate repository. + :param version: Database will upgrade/downgrade until this version. + If None - database will update to the latest + available version. + :param init_version: Initial database version + """ + if version is not None: + try: + version = int(version) + except ValueError: + raise exception.DbMigrationError( + message=_("version should be an integer")) + + current_version = db_version(abs_path, init_version) + repository = _find_migrate_repo(abs_path) + if version is None or version > current_version: + return versioning_api.upgrade(get_engine(), repository, version) + else: + return versioning_api.downgrade(get_engine(), repository, + version) + + +def db_version(abs_path, init_version): + """Show the current version of the repository. + + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + try: + return versioning_api.db_version(get_engine(), repository) + except versioning_exceptions.DatabaseNotControlledError: + meta = sqlalchemy.MetaData() + engine = get_engine() + meta.reflect(bind=engine) + tables = meta.tables + if len(tables) == 0: + db_version_control(abs_path, init_version) + return versioning_api.db_version(get_engine(), repository) + else: + # Some pre-Essex DB's may not be version controlled. + # Require them to upgrade using Essex first. + raise exception.DbMigrationError( + message=_("Upgrade DB using Essex release first.")) + + +def db_version_control(abs_path, version=None): + """Mark a database as under this repository's version control. + + Once a database is under version control, schema changes should + only be done via change scripts in this repository. + + :param abs_path: Absolute path to migrate repository + :param version: Initial database version + """ + repository = _find_migrate_repo(abs_path) + versioning_api.version_control(get_engine(), repository, version) + return version + + +def _find_migrate_repo(abs_path): + """Get the project's change script repository + + :param abs_path: Absolute path to migrate repository + """ + global _REPOSITORY + if not os.path.exists(abs_path): + raise exception.DbMigrationError("Path %s not found" % abs_path) + if _REPOSITORY is None: + _REPOSITORY = Repository(abs_path) + return _REPOSITORY diff --git a/ceilometer/openstack/common/db/sqlalchemy/models.py b/ceilometer/openstack/common/db/sqlalchemy/models.py index 18377487d..d6d90a3cb 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/models.py +++ b/ceilometer/openstack/common/db/sqlalchemy/models.py @@ -61,13 +61,15 @@ class ModelBase(object): def get(self, key, default=None): return getattr(self, key, default) + def _get_extra_keys(self): + return [] + def __iter__(self): columns = dict(object_mapper(self).columns).keys() # NOTE(russellb): Allow models to specify other keys that can be looked # up, beyond the actual db columns. An example would be the 'name' # property for an Instance. - if hasattr(self, '_extra_keys'): - columns.extend(self._extra_keys()) + columns.extend(self._get_extra_keys()) self._i = iter(columns) return self diff --git a/ceilometer/openstack/common/db/sqlalchemy/session.py b/ceilometer/openstack/common/db/sqlalchemy/session.py index 9345a1535..b6c365804 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/session.py +++ b/ceilometer/openstack/common/db/sqlalchemy/session.py @@ -241,11 +241,11 @@ Efficient use of soft deletes: # This will produce count(bar_refs) db requests. """ +import functools import os.path import re import time -from eventlet import greenthread from oslo.config import cfg import six from sqlalchemy import exc as sqla_exc @@ -279,13 +279,13 @@ database_opts = [ deprecated_opts=[cfg.DeprecatedOpt('sql_connection', group='DEFAULT'), cfg.DeprecatedOpt('sql_connection', - group='DATABASE')], - secret=True), + group='DATABASE'), + cfg.DeprecatedOpt('connection', + group='sql'), ]), cfg.StrOpt('slave_connection', default='', help='The SQLAlchemy connection string used to connect to the ' - 'slave database', - secret=True), + 'slave database'), cfg.IntOpt('idle_timeout', default=3600, deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', @@ -478,6 +478,11 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): if engine_name not in ["mysql", "sqlite", "postgresql"]: return + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) if not m: return @@ -510,6 +515,11 @@ def _raise_if_deadlock_error(operational_error, engine_name): re = _DEADLOCK_RE_DB.get(engine_name) if re is None: return + # FIXME(johannes): The usage of the .message attribute has been + # deprecated since Python 2.6. However, the exceptions raised by + # SQLAlchemy can differ when using unicode() and accessing .message. + # An audit across all three supported engines will be necessary to + # ensure there are no regressions. m = re.match(operational_error.message) if not m: return @@ -517,6 +527,7 @@ def _raise_if_deadlock_error(operational_error, engine_name): def _wrap_db_error(f): + @functools.wraps(f) def _wrap(*args, **kwargs): try: return f(*args, **kwargs) @@ -541,7 +552,6 @@ def _wrap_db_error(f): except Exception as e: LOG.exception(_('DB exception wrapped.')) raise exception.DBError(e) - _wrap.func_name = f.func_name return _wrap @@ -581,14 +591,16 @@ def _add_regexp_listener(dbapi_con, con_record): dbapi_con.create_function('regexp', 2, regexp) -def _greenthread_yield(dbapi_con, con_record): +def _thread_yield(dbapi_con, con_record): """Ensure other greenthreads get a chance to be executed. + If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will + execute instead of time.sleep(0). Force a context switch. With common database backends (eg MySQLdb and sqlite), there is no implicit yield caused by network I/O since they are implemented by C libraries that eventlet cannot monkey patch. """ - greenthread.sleep(0) + time.sleep(0) def _ping_listener(dbapi_conn, connection_rec, connection_proxy): @@ -657,7 +669,7 @@ def create_engine(sql_connection, sqlite_fk=False): engine = sqlalchemy.create_engine(sql_connection, **engine_args) - sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) + sqlalchemy.event.listen(engine, 'checkin', _thread_yield) if 'mysql' in connection_dict.drivername: sqlalchemy.event.listen(engine, 'checkout', _ping_listener) diff --git a/ceilometer/openstack/common/db/sqlalchemy/test_migrations.py b/ceilometer/openstack/common/db/sqlalchemy/test_migrations.py new file mode 100644 index 000000000..0bed212c8 --- /dev/null +++ b/ceilometer/openstack/common/db/sqlalchemy/test_migrations.py @@ -0,0 +1,289 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010-2011 OpenStack Foundation +# Copyright 2012-2013 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import commands +import ConfigParser +import os +import urlparse + +import sqlalchemy +import sqlalchemy.exc + +from ceilometer.openstack.common import lockutils +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import test + +LOG = logging.getLogger(__name__) + + +def _get_connect_string(backend, user, passwd, database): + """Get database connection + + Try to get a connection with a very specific set of values, if we get + these then we'll run the tests, otherwise they are skipped + """ + if backend == "postgres": + backend = "postgresql+psycopg2" + elif backend == "mysql": + backend = "mysql+mysqldb" + else: + raise Exception("Unrecognized backend: '%s'" % backend) + + return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" + % {'backend': backend, 'user': user, 'passwd': passwd, + 'database': database}) + + +def _is_backend_avail(backend, user, passwd, database): + try: + connect_uri = _get_connect_string(backend, user, passwd, database) + engine = sqlalchemy.create_engine(connect_uri) + connection = engine.connect() + except Exception: + # intentionally catch all to handle exceptions even if we don't + # have any backend code loaded. + return False + else: + connection.close() + engine.dispose() + return True + + +def _have_mysql(user, passwd, database): + present = os.environ.get('TEST_MYSQL_PRESENT') + if present is None: + return _is_backend_avail('mysql', user, passwd, database) + return present.lower() in ('', 'true') + + +def _have_postgresql(user, passwd, database): + present = os.environ.get('TEST_POSTGRESQL_PRESENT') + if present is None: + return _is_backend_avail('postgres', user, passwd, database) + return present.lower() in ('', 'true') + + +def get_db_connection_info(conn_pieces): + database = conn_pieces.path.strip('/') + loc_pieces = conn_pieces.netloc.split('@') + host = loc_pieces[1] + + auth_pieces = loc_pieces[0].split(':') + user = auth_pieces[0] + password = "" + if len(auth_pieces) > 1: + password = auth_pieces[1].strip() + + return (user, password, database, host) + + +class BaseMigrationTestCase(test.BaseTestCase): + """Base class fort testing of migration utils.""" + + def __init__(self, *args, **kwargs): + super(BaseMigrationTestCase, self).__init__(*args, **kwargs) + + self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), + 'test_migrations.conf') + # Test machines can set the TEST_MIGRATIONS_CONF variable + # to override the location of the config file for migration testing + self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF', + self.DEFAULT_CONFIG_FILE) + self.test_databases = {} + self.migration_api = None + + def setUp(self): + super(BaseMigrationTestCase, self).setUp() + + # Load test databases from the config file. Only do this + # once. No need to re-run this on each test... + LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH) + if os.path.exists(self.CONFIG_FILE_PATH): + cp = ConfigParser.RawConfigParser() + try: + cp.read(self.CONFIG_FILE_PATH) + defaults = cp.defaults() + for key, value in defaults.items(): + self.test_databases[key] = value + except ConfigParser.ParsingError as e: + self.fail("Failed to read test_migrations.conf config " + "file. Got error: %s" % e) + else: + self.fail("Failed to find test_migrations.conf config " + "file.") + + self.engines = {} + for key, value in self.test_databases.items(): + self.engines[key] = sqlalchemy.create_engine(value) + + # We start each test case with a completely blank slate. + self._reset_databases() + + def tearDown(self): + # We destroy the test data store between each test case, + # and recreate it, which ensures that we have no side-effects + # from the tests + self._reset_databases() + super(BaseMigrationTestCase, self).tearDown() + + def execute_cmd(self, cmd=None): + status, output = commands.getstatusoutput(cmd) + LOG.debug(output) + self.assertEqual(0, status, + "Failed to run: %s\n%s" % (cmd, output)) + + @lockutils.synchronized('pgadmin', 'tests-', external=True) + def _reset_pg(self, conn_pieces): + (user, password, database, host) = get_db_connection_info(conn_pieces) + os.environ['PGPASSWORD'] = password + os.environ['PGUSER'] = user + # note(boris-42): We must create and drop database, we can't + # drop database which we have connected to, so for such + # operations there is a special database template1. + sqlcmd = ("psql -w -U %(user)s -h %(host)s -c" + " '%(sql)s' -d template1") + + sql = ("drop database if exists %s;") % database + droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(droptable) + + sql = ("create database %s;") % database + createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql} + self.execute_cmd(createtable) + + os.unsetenv('PGPASSWORD') + os.unsetenv('PGUSER') + + def _reset_databases(self): + for key, engine in self.engines.items(): + conn_string = self.test_databases[key] + conn_pieces = urlparse.urlparse(conn_string) + engine.dispose() + if conn_string.startswith('sqlite'): + # We can just delete the SQLite database, which is + # the easiest and cleanest solution + db_path = conn_pieces.path.strip('/') + if os.path.exists(db_path): + os.unlink(db_path) + # No need to recreate the SQLite DB. SQLite will + # create it for us if it's not there... + elif conn_string.startswith('mysql'): + # We can execute the MySQL client to destroy and re-create + # the MYSQL database, which is easier and less error-prone + # than using SQLAlchemy to do this via MetaData...trust me. + (user, password, database, host) = \ + get_db_connection_info(conn_pieces) + sql = ("drop database if exists %(db)s; " + "create database %(db)s;") % {'db': database} + cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s " + "-e \"%(sql)s\"") % {'user': user, 'password': password, + 'host': host, 'sql': sql} + self.execute_cmd(cmd) + elif conn_string.startswith('postgresql'): + self._reset_pg(conn_pieces) + + +class WalkVersionsMixin(object): + def _walk_versions(self, engine=None, snake_walk=False, downgrade=True): + # Determine latest version script from the repo, then + # upgrade from 1 through to the latest, with no data + # in the databases. This just checks that the schema itself + # upgrades successfully. + + # Place the database under version control + self.migration_api.version_control(engine, self.REPOSITORY, + self.INIT_VERSION) + self.assertEqual(self.INIT_VERSION, + self.migration_api.db_version(engine, + self.REPOSITORY)) + + LOG.debug('latest version is %s' % self.REPOSITORY.latest) + versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1) + + for version in versions: + # upgrade -> downgrade -> upgrade + self._migrate_up(engine, version, with_data=True) + if snake_walk: + downgraded = self._migrate_down( + engine, version - 1, with_data=True) + if downgraded: + self._migrate_up(engine, version) + + if downgrade: + # Now walk it back down to 0 from the latest, testing + # the downgrade paths. + for version in reversed(versions): + # downgrade -> upgrade -> downgrade + downgraded = self._migrate_down(engine, version - 1) + + if snake_walk and downgraded: + self._migrate_up(engine, version) + self._migrate_down(engine, version - 1) + + def _migrate_down(self, engine, version, with_data=False): + try: + self.migration_api.downgrade(engine, self.REPOSITORY, version) + except NotImplementedError: + # NOTE(sirp): some migrations, namely release-level + # migrations, don't support a downgrade. + return False + + self.assertEqual( + version, self.migration_api.db_version(engine, self.REPOSITORY)) + + # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target' + # version). So if we have any downgrade checks, they need to be run for + # the previous (higher numbered) migration. + if with_data: + post_downgrade = getattr( + self, "_post_downgrade_%03d" % (version + 1), None) + if post_downgrade: + post_downgrade(engine) + + return True + + def _migrate_up(self, engine, version, with_data=False): + """migrate up to a new version of the db. + + We allow for data insertion and post checks at every + migration version with special _pre_upgrade_### and + _check_### functions in the main test. + """ + # NOTE(sdague): try block is here because it's impossible to debug + # where a failed data migration happens otherwise + try: + if with_data: + data = None + pre_upgrade = getattr( + self, "_pre_upgrade_%03d" % version, None) + if pre_upgrade: + data = pre_upgrade(engine) + + self.migration_api.upgrade(engine, self.REPOSITORY, version) + self.assertEqual(version, + self.migration_api.db_version(engine, + self.REPOSITORY)) + if with_data: + check = getattr(self, "_check_%03d" % version, None) + if check: + check(engine, data) + except Exception: + LOG.error("Failed to migrate to version %s on engine %s" % + (version, engine)) + raise diff --git a/ceilometer/openstack/common/db/sqlalchemy/utils.py b/ceilometer/openstack/common/db/sqlalchemy/utils.py index bda01897c..a9c073dfa 100644 --- a/ceilometer/openstack/common/db/sqlalchemy/utils.py +++ b/ceilometer/openstack/common/db/sqlalchemy/utils.py @@ -18,6 +18,8 @@ # License for the specific language governing permissions and limitations # under the License. +import re + from migrate.changeset import UniqueConstraint import sqlalchemy from sqlalchemy import Boolean @@ -38,13 +40,21 @@ from sqlalchemy.types import NullType from ceilometer.openstack.common.gettextutils import _ # noqa -from ceilometer.openstack.common import exception from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import timeutils LOG = logging.getLogger(__name__) +_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+") + + +def sanitize_db_url(url): + match = _DBURL_REGEX.match(url) + if match: + return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):]) + return url + class InvalidSortKey(Exception): message = _("Sort key supplied was not valid.") @@ -175,6 +185,10 @@ def visit_insert_from_select(element, compiler, **kw): compiler.process(element.select)) +class ColumnError(Exception): + """Error raised when no column or an invalid column is found.""" + + def _get_not_supported_column(col_name_col_instance, column_name): try: column = col_name_col_instance[column_name] @@ -182,13 +196,13 @@ def _get_not_supported_column(col_name_col_instance, column_name): msg = _("Please specify column %s in col_name_col_instance " "param. It is required because column has unsupported " "type by sqlite).") - raise exception.OpenstackException(message=msg % column_name) + raise ColumnError(msg % column_name) if not isinstance(column, Column): msg = _("col_name_col_instance param has wrong type of " "column instance for column %s It should be instance " "of sqlalchemy.Column.") - raise exception.OpenstackException(message=msg % column_name) + raise ColumnError(msg % column_name) return column @@ -286,8 +300,7 @@ def _get_default_deleted_value(table): return 0 if isinstance(table.c.id.type, String): return "" - raise exception.OpenstackException( - message=_("Unsupported id columns type")) + raise ColumnError(_("Unsupported id columns type")) def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): @@ -357,7 +370,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, constraints = [constraint.copy() for constraint in table.constraints] - meta = MetaData(bind=migrate_engine) + meta = table.metadata new_table = Table(table_name + "__tmp__", meta, *(columns + constraints)) new_table.create() diff --git a/ceilometer/openstack/common/deprecated/__init__.py b/ceilometer/openstack/common/deprecated/__init__.py new file mode 100644 index 000000000..d0ce7ddca --- /dev/null +++ b/ceilometer/openstack/common/deprecated/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2013 Red Hat, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import warnings + +msg = ("Modules in this package are deprecated " + "and will be removed in future releases") + +warnings.warn(msg, DeprecationWarning) diff --git a/ceilometer/openstack/common/deprecated/wsgi.py b/ceilometer/openstack/common/deprecated/wsgi.py new file mode 100644 index 000000000..718e180db --- /dev/null +++ b/ceilometer/openstack/common/deprecated/wsgi.py @@ -0,0 +1,736 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Utility methods for working with WSGI servers.""" + +import eventlet +eventlet.patcher.monkey_patch(all=False, socket=True) + +import datetime +import errno +import socket +import time + +import eventlet.wsgi +from oslo.config import cfg +import routes +import routes.middleware +import six +import webob.dec +import webob.exc +from xml.dom import minidom +from xml.parsers import expat + +from ceilometer.openstack.common.gettextutils import _ # noqa +from ceilometer.openstack.common import jsonutils +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import service +from ceilometer.openstack.common import sslutils +from ceilometer.openstack.common import xmlutils + +socket_opts = [ + cfg.IntOpt('backlog', + default=4096, + help="Number of backlog requests to configure the socket with"), + cfg.IntOpt('tcp_keepidle', + default=600, + help="Sets the value of TCP_KEEPIDLE in seconds for each " + "server socket. Not supported on OS X."), +] + +CONF = cfg.CONF +CONF.register_opts(socket_opts) + +LOG = logging.getLogger(__name__) + + +class MalformedRequestBody(Exception): + def __init__(self, reason): + super(MalformedRequestBody, self).__init__( + "Malformed message body: %s", reason) + + +class InvalidContentType(Exception): + def __init__(self, content_type): + super(InvalidContentType, self).__init__( + "Invalid content type %s", content_type) + + +def run_server(application, port, **kwargs): + """Run a WSGI server with the given application.""" + sock = eventlet.listen(('0.0.0.0', port)) + eventlet.wsgi.server(sock, application, **kwargs) + + +class Service(service.Service): + """Provides a Service API for wsgi servers. + + This gives us the ability to launch wsgi servers with the + Launcher classes in service.py. + """ + + def __init__(self, application, port, + host='0.0.0.0', backlog=4096, threads=1000): + self.application = application + self._port = port + self._host = host + self._backlog = backlog if backlog else CONF.backlog + self._socket = self._get_socket(host, port, self._backlog) + super(Service, self).__init__(threads) + + def _get_socket(self, host, port, backlog): + # TODO(dims): eventlet's green dns/socket module does not actually + # support IPv6 in getaddrinfo(). We need to get around this in the + # future or monitor upstream for a fix + info = socket.getaddrinfo(host, + port, + socket.AF_UNSPEC, + socket.SOCK_STREAM)[0] + family = info[0] + bind_addr = info[-1] + + sock = None + retry_until = time.time() + 30 + while not sock and time.time() < retry_until: + try: + sock = eventlet.listen(bind_addr, + backlog=backlog, + family=family) + if sslutils.is_enabled(): + sock = sslutils.wrap(sock) + + except socket.error as err: + if err.args[0] != errno.EADDRINUSE: + raise + eventlet.sleep(0.1) + if not sock: + raise RuntimeError(_("Could not bind to %(host)s:%(port)s " + "after trying for 30 seconds") % + {'host': host, 'port': port}) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sockets can hang around forever without keepalive + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # This option isn't available in the OS X version of eventlet + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, + CONF.tcp_keepidle) + + return sock + + def start(self): + """Start serving this service using the provided server instance. + + :returns: None + + """ + super(Service, self).start() + self.tg.add_thread(self._run, self.application, self._socket) + + @property + def backlog(self): + return self._backlog + + @property + def host(self): + return self._socket.getsockname()[0] if self._socket else self._host + + @property + def port(self): + return self._socket.getsockname()[1] if self._socket else self._port + + def stop(self): + """Stop serving this API. + + :returns: None + + """ + super(Service, self).stop() + + def _run(self, application, socket): + """Start a WSGI server in a new green thread.""" + logger = logging.getLogger('eventlet.wsgi') + eventlet.wsgi.server(socket, + application, + custom_pool=self.tg.pool, + log=logging.WritableLogger(logger)) + + +class Router(object): + + """WSGI middleware that maps incoming requests to WSGI apps.""" + + def __init__(self, mapper): + """Create a router for the given routes.Mapper. + + Each route in `mapper` must specify a 'controller', which is a + WSGI app to call. You'll probably want to specify an 'action' as + well and have your controller be a wsgi.Controller, who will route + the request to the action method. + + Examples: + mapper = routes.Mapper() + sc = ServerController() + + # Explicit mapping of one route to a controller+action + mapper.connect(None, "/svrlist", controller=sc, action="list") + + # Actions are all implicitly defined + mapper.resource("server", "servers", controller=sc) + + # Pointing to an arbitrary WSGI app. You can specify the + # {path_info:.*} parameter so the target app can be handed just that + # section of the URL. + mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp()) + """ + self.map = mapper + self._router = routes.middleware.RoutesMiddleware(self._dispatch, + self.map) + + @webob.dec.wsgify + def __call__(self, req): + """Route the incoming request to a controller based on self.map. + + If no match, return a 404. + """ + return self._router + + @staticmethod + @webob.dec.wsgify + def _dispatch(req): + """Gets application from the environment. + + Called by self._router after matching the incoming request to a route + and putting the information into req.environ. Either returns 404 + or the routed WSGI app's response. + """ + match = req.environ['wsgiorg.routing_args'][1] + if not match: + return webob.exc.HTTPNotFound() + app = match['controller'] + return app + + +class Request(webob.Request): + """Add some Openstack API-specific logic to the base webob.Request.""" + + default_request_content_types = ('application/json', 'application/xml') + default_accept_types = ('application/json', 'application/xml') + default_accept_type = 'application/json' + + def best_match_content_type(self, supported_content_types=None): + """Determine the requested response content-type. + + Based on the query extension then the Accept header. + Defaults to default_accept_type if we don't find a preference + + """ + supported_content_types = (supported_content_types or + self.default_accept_types) + + parts = self.path.rsplit('.', 1) + if len(parts) > 1: + ctype = 'application/{0}'.format(parts[1]) + if ctype in supported_content_types: + return ctype + + bm = self.accept.best_match(supported_content_types) + return bm or self.default_accept_type + + def get_content_type(self, allowed_content_types=None): + """Determine content type of the request body. + + Does not do any body introspection, only checks header + + """ + if "Content-Type" not in self.headers: + return None + + content_type = self.content_type + allowed_content_types = (allowed_content_types or + self.default_request_content_types) + + if content_type not in allowed_content_types: + raise InvalidContentType(content_type=content_type) + return content_type + + +class Resource(object): + """WSGI app that handles (de)serialization and controller dispatch. + + Reads routing information supplied by RoutesMiddleware and calls + the requested action method upon its deserializer, controller, + and serializer. Those three objects may implement any of the basic + controller action methods (create, update, show, index, delete) + along with any that may be specified in the api router. A 'default' + method may also be implemented to be used in place of any + non-implemented actions. Deserializer methods must accept a request + argument and return a dictionary. Controller methods must accept a + request argument. Additionally, they must also accept keyword + arguments that represent the keys returned by the Deserializer. They + may raise a webob.exc exception or return a dict, which will be + serialized by requested content type. + """ + def __init__(self, controller, deserializer=None, serializer=None): + """Initiates Resource object. + + :param controller: object that implement methods created by routes lib + :param deserializer: object that supports webob request deserialization + through controller-like actions + :param serializer: object that supports webob response serialization + through controller-like actions + """ + self.controller = controller + self.serializer = serializer or ResponseSerializer() + self.deserializer = deserializer or RequestDeserializer() + + @webob.dec.wsgify(RequestClass=Request) + def __call__(self, request): + """WSGI method that controls (de)serialization and method dispatch.""" + + try: + action, action_args, accept = self.deserialize_request(request) + except InvalidContentType: + msg = _("Unsupported Content-Type") + return webob.exc.HTTPUnsupportedMediaType(explanation=msg) + except MalformedRequestBody: + msg = _("Malformed request body") + return webob.exc.HTTPBadRequest(explanation=msg) + + action_result = self.execute_action(action, request, **action_args) + try: + return self.serialize_response(action, action_result, accept) + # return unserializable result (typically a webob exc) + except Exception: + return action_result + + def deserialize_request(self, request): + return self.deserializer.deserialize(request) + + def serialize_response(self, action, action_result, accept): + return self.serializer.serialize(action_result, accept, action) + + def execute_action(self, action, request, **action_args): + return self.dispatch(self.controller, action, request, **action_args) + + def dispatch(self, obj, action, *args, **kwargs): + """Find action-specific method on self and call it.""" + try: + method = getattr(obj, action) + except AttributeError: + method = getattr(obj, 'default') + + return method(*args, **kwargs) + + def get_action_args(self, request_environment): + """Parse dictionary created by routes library.""" + try: + args = request_environment['wsgiorg.routing_args'][1].copy() + except Exception: + return {} + + try: + del args['controller'] + except KeyError: + pass + + try: + del args['format'] + except KeyError: + pass + + return args + + +class ActionDispatcher(object): + """Maps method name to local methods through action name.""" + + def dispatch(self, *args, **kwargs): + """Find and call local method.""" + action = kwargs.pop('action', 'default') + action_method = getattr(self, str(action), self.default) + return action_method(*args, **kwargs) + + def default(self, data): + raise NotImplementedError() + + +class DictSerializer(ActionDispatcher): + """Default request body serialization.""" + + def serialize(self, data, action='default'): + return self.dispatch(data, action=action) + + def default(self, data): + return "" + + +class JSONDictSerializer(DictSerializer): + """Default JSON request body serialization.""" + + def default(self, data): + def sanitizer(obj): + if isinstance(obj, datetime.datetime): + _dtime = obj - datetime.timedelta(microseconds=obj.microsecond) + return _dtime.isoformat() + return six.text_type(obj) + return jsonutils.dumps(data, default=sanitizer) + + +class XMLDictSerializer(DictSerializer): + + def __init__(self, metadata=None, xmlns=None): + """Initiates XMLDictSerializer object. + + :param metadata: information needed to deserialize xml into + a dictionary. + :param xmlns: XML namespace to include with serialized xml + """ + super(XMLDictSerializer, self).__init__() + self.metadata = metadata or {} + self.xmlns = xmlns + + def default(self, data): + # We expect data to contain a single key which is the XML root. + root_key = data.keys()[0] + doc = minidom.Document() + node = self._to_xml_node(doc, self.metadata, root_key, data[root_key]) + + return self.to_xml_string(node) + + def to_xml_string(self, node, has_atom=False): + self._add_xmlns(node, has_atom) + return node.toprettyxml(indent=' ', encoding='UTF-8') + + #NOTE (ameade): the has_atom should be removed after all of the + # xml serializers and view builders have been updated to the current + # spec that required all responses include the xmlns:atom, the has_atom + # flag is to prevent current tests from breaking + def _add_xmlns(self, node, has_atom=False): + if self.xmlns is not None: + node.setAttribute('xmlns', self.xmlns) + if has_atom: + node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom") + + def _to_xml_node(self, doc, metadata, nodename, data): + """Recursive method to convert data members to XML nodes.""" + result = doc.createElement(nodename) + + # Set the xml namespace if one is specified + # TODO(justinsb): We could also use prefixes on the keys + xmlns = metadata.get('xmlns', None) + if xmlns: + result.setAttribute('xmlns', xmlns) + + #TODO(bcwaldon): accomplish this without a type-check + if type(data) is list: + collections = metadata.get('list_collections', {}) + if nodename in collections: + metadata = collections[nodename] + for item in data: + node = doc.createElement(metadata['item_name']) + node.setAttribute(metadata['item_key'], str(item)) + result.appendChild(node) + return result + singular = metadata.get('plurals', {}).get(nodename, None) + if singular is None: + if nodename.endswith('s'): + singular = nodename[:-1] + else: + singular = 'item' + for item in data: + node = self._to_xml_node(doc, metadata, singular, item) + result.appendChild(node) + #TODO(bcwaldon): accomplish this without a type-check + elif type(data) is dict: + collections = metadata.get('dict_collections', {}) + if nodename in collections: + metadata = collections[nodename] + for k, v in data.items(): + node = doc.createElement(metadata['item_name']) + node.setAttribute(metadata['item_key'], str(k)) + text = doc.createTextNode(str(v)) + node.appendChild(text) + result.appendChild(node) + return result + attrs = metadata.get('attributes', {}).get(nodename, {}) + for k, v in data.items(): + if k in attrs: + result.setAttribute(k, str(v)) + else: + node = self._to_xml_node(doc, metadata, k, v) + result.appendChild(node) + else: + # Type is atom + node = doc.createTextNode(str(data)) + result.appendChild(node) + return result + + def _create_link_nodes(self, xml_doc, links): + link_nodes = [] + for link in links: + link_node = xml_doc.createElement('atom:link') + link_node.setAttribute('rel', link['rel']) + link_node.setAttribute('href', link['href']) + if 'type' in link: + link_node.setAttribute('type', link['type']) + link_nodes.append(link_node) + return link_nodes + + +class ResponseHeadersSerializer(ActionDispatcher): + """Default response headers serialization.""" + + def serialize(self, response, data, action): + self.dispatch(response, data, action=action) + + def default(self, response, data): + response.status_int = 200 + + +class ResponseSerializer(object): + """Encode the necessary pieces into a response object.""" + + def __init__(self, body_serializers=None, headers_serializer=None): + self.body_serializers = { + 'application/xml': XMLDictSerializer(), + 'application/json': JSONDictSerializer(), + } + self.body_serializers.update(body_serializers or {}) + + self.headers_serializer = (headers_serializer or + ResponseHeadersSerializer()) + + def serialize(self, response_data, content_type, action='default'): + """Serialize a dict into a string and wrap in a wsgi.Request object. + + :param response_data: dict produced by the Controller + :param content_type: expected mimetype of serialized response body + + """ + response = webob.Response() + self.serialize_headers(response, response_data, action) + self.serialize_body(response, response_data, content_type, action) + return response + + def serialize_headers(self, response, data, action): + self.headers_serializer.serialize(response, data, action) + + def serialize_body(self, response, data, content_type, action): + response.headers['Content-Type'] = content_type + if data is not None: + serializer = self.get_body_serializer(content_type) + response.body = serializer.serialize(data, action) + + def get_body_serializer(self, content_type): + try: + return self.body_serializers[content_type] + except (KeyError, TypeError): + raise InvalidContentType(content_type=content_type) + + +class RequestHeadersDeserializer(ActionDispatcher): + """Default request headers deserializer""" + + def deserialize(self, request, action): + return self.dispatch(request, action=action) + + def default(self, request): + return {} + + +class RequestDeserializer(object): + """Break up a Request object into more useful pieces.""" + + def __init__(self, body_deserializers=None, headers_deserializer=None, + supported_content_types=None): + + self.supported_content_types = supported_content_types + + self.body_deserializers = { + 'application/xml': XMLDeserializer(), + 'application/json': JSONDeserializer(), + } + self.body_deserializers.update(body_deserializers or {}) + + self.headers_deserializer = (headers_deserializer or + RequestHeadersDeserializer()) + + def deserialize(self, request): + """Extract necessary pieces of the request. + + :param request: Request object + :returns: tuple of (expected controller action name, dictionary of + keyword arguments to pass to the controller, the expected + content type of the response) + + """ + action_args = self.get_action_args(request.environ) + action = action_args.pop('action', None) + + action_args.update(self.deserialize_headers(request, action)) + action_args.update(self.deserialize_body(request, action)) + + accept = self.get_expected_content_type(request) + + return (action, action_args, accept) + + def deserialize_headers(self, request, action): + return self.headers_deserializer.deserialize(request, action) + + def deserialize_body(self, request, action): + if not request.body: + LOG.debug(_("Empty body provided in request")) + return {} + + try: + content_type = request.get_content_type() + except InvalidContentType: + LOG.debug(_("Unrecognized Content-Type provided in request")) + raise + + if content_type is None: + LOG.debug(_("No Content-Type provided in request")) + return {} + + try: + deserializer = self.get_body_deserializer(content_type) + except InvalidContentType: + LOG.debug(_("Unable to deserialize body as provided Content-Type")) + raise + + return deserializer.deserialize(request.body, action) + + def get_body_deserializer(self, content_type): + try: + return self.body_deserializers[content_type] + except (KeyError, TypeError): + raise InvalidContentType(content_type=content_type) + + def get_expected_content_type(self, request): + return request.best_match_content_type(self.supported_content_types) + + def get_action_args(self, request_environment): + """Parse dictionary created by routes library.""" + try: + args = request_environment['wsgiorg.routing_args'][1].copy() + except Exception: + return {} + + try: + del args['controller'] + except KeyError: + pass + + try: + del args['format'] + except KeyError: + pass + + return args + + +class TextDeserializer(ActionDispatcher): + """Default request body deserialization.""" + + def deserialize(self, datastring, action='default'): + return self.dispatch(datastring, action=action) + + def default(self, datastring): + return {} + + +class JSONDeserializer(TextDeserializer): + + def _from_json(self, datastring): + try: + return jsonutils.loads(datastring) + except ValueError: + msg = _("cannot understand JSON") + raise MalformedRequestBody(reason=msg) + + def default(self, datastring): + return {'body': self._from_json(datastring)} + + +class XMLDeserializer(TextDeserializer): + + def __init__(self, metadata=None): + """Initiates XMLDeserializer object. + + :param metadata: information needed to deserialize xml into + a dictionary. + """ + super(XMLDeserializer, self).__init__() + self.metadata = metadata or {} + + def _from_xml(self, datastring): + plurals = set(self.metadata.get('plurals', {})) + + try: + node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0] + return {node.nodeName: self._from_xml_node(node, plurals)} + except expat.ExpatError: + msg = _("cannot understand XML") + raise MalformedRequestBody(reason=msg) + + def _from_xml_node(self, node, listnames): + """Convert a minidom node to a simple Python type. + + :param listnames: list of XML node names whose subnodes should + be considered list items. + + """ + + if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3: + return node.childNodes[0].nodeValue + elif node.nodeName in listnames: + return [self._from_xml_node(n, listnames) for n in node.childNodes] + else: + result = dict() + for attr in node.attributes.keys(): + result[attr] = node.attributes[attr].nodeValue + for child in node.childNodes: + if child.nodeType != node.TEXT_NODE: + result[child.nodeName] = self._from_xml_node(child, + listnames) + return result + + def find_first_child_named(self, parent, name): + """Search a nodes children for the first child with a given name.""" + for node in parent.childNodes: + if node.nodeName == name: + return node + return None + + def find_children_named(self, parent, name): + """Return all of a nodes children who have the given name.""" + for node in parent.childNodes: + if node.nodeName == name: + yield node + + def extract_text(self, node): + """Get the text field contained by the given node.""" + if len(node.childNodes) == 1: + child = node.childNodes[0] + if child.nodeType == child.TEXT_NODE: + return child.nodeValue + return "" + + def default(self, datastring): + return {'body': self._from_xml(datastring)} diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index fe89380f5..771275b29 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -24,6 +24,8 @@ import sys import time import traceback +import six + from ceilometer.openstack.common.gettextutils import _ # noqa @@ -65,7 +67,7 @@ class save_and_reraise_exception(object): self.tb)) return False if self.reraise: - raise self.type_, self.value, self.tb + six.reraise(self.type_, self.value, self.tb) def forever_retry_uncaught_exceptions(infunc): @@ -77,7 +79,8 @@ def forever_retry_uncaught_exceptions(infunc): try: return infunc(*args, **kwargs) except Exception as exc: - if exc.message == last_exc_message: + this_exc_message = six.u(str(exc)) + if this_exc_message == last_exc_message: exc_count += 1 else: exc_count = 1 @@ -85,12 +88,12 @@ def forever_retry_uncaught_exceptions(infunc): # the exception message changes cur_time = int(time.time()) if (cur_time - last_log_time > 60 or - exc.message != last_exc_message): + this_exc_message != last_exc_message): logging.exception( _('Unexpected exception occurred %d time(s)... ' 'retrying.') % exc_count) last_log_time = cur_time - last_exc_message = exc.message + last_exc_message = this_exc_message exc_count = 0 # This should be a very rare event. In case it isn't, do # a sleep. diff --git a/ceilometer/openstack/common/fileutils.py b/ceilometer/openstack/common/fileutils.py index f74a6227e..3eb5576e6 100644 --- a/ceilometer/openstack/common/fileutils.py +++ b/ceilometer/openstack/common/fileutils.py @@ -19,6 +19,7 @@ import contextlib import errno import os +import tempfile from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ # noqa @@ -69,33 +70,34 @@ def read_cached_file(filename, force_reload=False): return (reloaded, cache_info['data']) -def delete_if_exists(path): +def delete_if_exists(path, remove=os.unlink): """Delete a file, but ignore file not found error. :param path: File to delete + :param remove: Optional function to remove passed path """ try: - os.unlink(path) + remove(path) except OSError as e: - if e.errno == errno.ENOENT: - return - else: + if e.errno != errno.ENOENT: raise @contextlib.contextmanager -def remove_path_on_error(path): +def remove_path_on_error(path, remove=delete_if_exists): """Protect code that wants to operate on PATH atomically. Any exception will cause PATH to be removed. :param path: File to work with + :param remove: Optional function to remove passed path """ + try: yield except Exception: with excutils.save_and_reraise_exception(): - delete_if_exists(path) + remove(path) def file_open(*args, **kwargs): @@ -108,3 +110,30 @@ def file_open(*args, **kwargs): state at all (for unit tests) """ return file(*args, **kwargs) + + +def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): + """Create temporary file or use existing file. + + This util is needed for creating temporary file with + specified content, suffix and prefix. If path is not None, + it will be used for writing content. If the path doesn't + exist it'll be created. + + :param content: content for temporary file. + :param path: same as parameter 'dir' for mkstemp + :param suffix: same as parameter 'suffix' for mkstemp + :param prefix: same as parameter 'prefix' for mkstemp + + For example: it can be used in database tests for creating + configuration files. + """ + if path: + ensure_tree(path) + + (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix) + try: + os.write(fd, content) + finally: + os.close(fd) + return path diff --git a/ceilometer/openstack/common/fixture/config.py b/ceilometer/openstack/common/fixture/config.py index cf52a6625..7b044ef74 100644 --- a/ceilometer/openstack/common/fixture/config.py +++ b/ceilometer/openstack/common/fixture/config.py @@ -17,6 +17,7 @@ # under the License. import fixtures from oslo.config import cfg +import six class Config(fixtures.Fixture): @@ -41,5 +42,5 @@ class Config(fixtures.Fixture): def config(self, **kw): group = kw.pop('group', None) - for k, v in kw.iteritems(): + for k, v in six.iteritems(kw): self.conf.set_override(k, v, group) diff --git a/ceilometer/openstack/common/gettextutils.py b/ceilometer/openstack/common/gettextutils.py index 703f43c31..12c8fa8b8 100644 --- a/ceilometer/openstack/common/gettextutils.py +++ b/ceilometer/openstack/common/gettextutils.py @@ -26,10 +26,13 @@ Usual usage in an openstack.common module: import copy import gettext -import logging.handlers +import logging import os import re -import UserString +try: + import UserString as _userString +except ImportError: + import collections as _userString from babel import localedata import six @@ -37,7 +40,7 @@ import six _localedir = os.environ.get('ceilometer'.upper() + '_LOCALEDIR') _t = gettext.translation('ceilometer', localedir=_localedir, fallback=True) -_AVAILABLE_LANGUAGES = [] +_AVAILABLE_LANGUAGES = {} USE_LAZY = False @@ -57,6 +60,8 @@ def _(msg): if USE_LAZY: return Message(msg, 'ceilometer') else: + if six.PY3: + return _t.gettext(msg) return _t.ugettext(msg) @@ -102,24 +107,28 @@ def install(domain, lazy=False): """ return Message(msg, domain) - import __builtin__ - __builtin__.__dict__['_'] = _lazy_gettext + from six import moves + moves.builtins.__dict__['_'] = _lazy_gettext else: localedir = '%s_LOCALEDIR' % domain.upper() - gettext.install(domain, - localedir=os.environ.get(localedir), - unicode=True) + if six.PY3: + gettext.install(domain, + localedir=os.environ.get(localedir)) + else: + gettext.install(domain, + localedir=os.environ.get(localedir), + unicode=True) -class Message(UserString.UserString, object): +class Message(_userString.UserString, object): """Class used to encapsulate translatable messages.""" def __init__(self, msg, domain): # _msg is the gettext msgid and should never change self._msg = msg self._left_extra_msg = '' self._right_extra_msg = '' + self._locale = None self.params = None - self.locale = None self.domain = domain @property @@ -139,8 +148,13 @@ class Message(UserString.UserString, object): localedir=localedir, fallback=True) + if six.PY3: + ugettext = lang.gettext + else: + ugettext = lang.ugettext + full_msg = (self._left_extra_msg + - lang.ugettext(self._msg) + + ugettext(self._msg) + self._right_extra_msg) if self.params is not None: @@ -148,6 +162,33 @@ class Message(UserString.UserString, object): return six.text_type(full_msg) + @property + def locale(self): + return self._locale + + @locale.setter + def locale(self, value): + self._locale = value + if not self.params: + return + + # This Message object may have been constructed with one or more + # Message objects as substitution parameters, given as a single + # Message, or a tuple or Map containing some, so when setting the + # locale for this Message we need to set it for those Messages too. + if isinstance(self.params, Message): + self.params.locale = value + return + if isinstance(self.params, tuple): + for param in self.params: + if isinstance(param, Message): + param.locale = value + return + if isinstance(self.params, dict): + for param in self.params.values(): + if isinstance(param, Message): + param.locale = value + def _save_dictionary_parameter(self, dict_param): full_msg = self.data # look for %(blah) fields in string; @@ -166,7 +207,7 @@ class Message(UserString.UserString, object): params[key] = copy.deepcopy(dict_param[key]) except TypeError: # cast uncopyable thing to unicode string - params[key] = unicode(dict_param[key]) + params[key] = six.text_type(dict_param[key]) return params @@ -185,7 +226,7 @@ class Message(UserString.UserString, object): try: self.params = copy.deepcopy(other) except TypeError: - self.params = unicode(other) + self.params = six.text_type(other) return self @@ -194,11 +235,13 @@ class Message(UserString.UserString, object): return self.data def __str__(self): + if six.PY3: + return self.__unicode__() return self.data.encode('utf-8') def __getstate__(self): to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg', - 'domain', 'params', 'locale'] + 'domain', 'params', '_locale'] new_dict = self.__dict__.fromkeys(to_copy) for attr in to_copy: new_dict[attr] = copy.deepcopy(self.__dict__[attr]) @@ -252,7 +295,7 @@ class Message(UserString.UserString, object): if name in ops: return getattr(self.data, name) else: - return UserString.UserString.__getattribute__(self, name) + return _userString.UserString.__getattribute__(self, name) def get_available_languages(domain): @@ -260,8 +303,8 @@ def get_available_languages(domain): :param domain: the domain to get languages for """ - if _AVAILABLE_LANGUAGES: - return _AVAILABLE_LANGUAGES + if domain in _AVAILABLE_LANGUAGES: + return copy.copy(_AVAILABLE_LANGUAGES[domain]) localedir = '%s_LOCALEDIR' % domain.upper() find = lambda x: gettext.find(domain, @@ -270,7 +313,7 @@ def get_available_languages(domain): # NOTE(mrodden): en_US should always be available (and first in case # order matters) since our in-line message strings are en_US - _AVAILABLE_LANGUAGES.append('en_US') + language_list = ['en_US'] # NOTE(luisg): Babel <1.0 used a function called list(), which was # renamed to locale_identifiers() in >=1.0, the requirements master list # requires >=0.9.6, uncapped, so defensively work with both. We can remove @@ -280,16 +323,17 @@ def get_available_languages(domain): locale_identifiers = list_identifiers() for i in locale_identifiers: if find(i) is not None: - _AVAILABLE_LANGUAGES.append(i) - return _AVAILABLE_LANGUAGES + language_list.append(i) + _AVAILABLE_LANGUAGES[domain] = language_list + return copy.copy(language_list) def get_localized_message(message, user_locale): """Gets a localized version of the given message in the given locale.""" - if (isinstance(message, Message)): + if isinstance(message, Message): if user_locale: message.locale = user_locale - return unicode(message) + return six.text_type(message) else: return message diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index 65b10e68e..88b01c0e7 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -38,14 +38,19 @@ import functools import inspect import itertools import json -import types -import xmlrpclib +try: + import xmlrpclib +except ImportError: + # NOTE(jd): xmlrpclib is not shipped with Python 3 + xmlrpclib = None -import netaddr import six +from ceilometer.openstack.common import gettextutils +from ceilometer.openstack.common import importutils from ceilometer.openstack.common import timeutils +netaddr = importutils.try_import("netaddr") _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.isfunction, inspect.isgeneratorfunction, @@ -53,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.iscode, inspect.isbuiltin, inspect.isroutine, inspect.isabstract] -_simple_types = (types.NoneType, int, basestring, bool, float, long) +_simple_types = (six.string_types + six.integer_types + + (type(None), bool, float)) def to_primitive(value, convert_instances=False, convert_datetime=True, @@ -125,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled - if isinstance(value, xmlrpclib.DateTime): + if xmlrpclib and isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) + elif isinstance(value, gettextutils.Message): + return value.data elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): @@ -138,7 +146,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Likely an instance of something. Watch for cycles. # Ignore class member vars. return recursive(value.__dict__, level=level + 1) - elif isinstance(value, netaddr.IPAddress): + elif netaddr and isinstance(value, netaddr.IPAddress): return six.text_type(value) else: if any(test(value) for test in _nasty_type_tests): diff --git a/ceilometer/openstack/common/local.py b/ceilometer/openstack/common/local.py index f1bfc824b..e82f17d0f 100644 --- a/ceilometer/openstack/common/local.py +++ b/ceilometer/openstack/common/local.py @@ -15,16 +15,15 @@ # License for the specific language governing permissions and limitations # under the License. -"""Greenthread local storage of variables using weak references""" +"""Local storage of variables using weak references""" +import threading import weakref -from eventlet import corolocal - -class WeakLocal(corolocal.local): +class WeakLocal(threading.local): def __getattribute__(self, attr): - rval = corolocal.local.__getattribute__(self, attr) + rval = super(WeakLocal, self).__getattribute__(attr) if rval: # NOTE(mikal): this bit is confusing. What is stored is a weak # reference, not the value itself. We therefore need to lookup @@ -34,7 +33,7 @@ class WeakLocal(corolocal.local): def __setattr__(self, attr, value): value = weakref.ref(value) - return corolocal.local.__setattr__(self, attr, value) + return super(WeakLocal, self).__setattr__(attr, value) # NOTE(mikal): the name "store" should be deprecated in the future @@ -45,4 +44,4 @@ store = WeakLocal() # "strong" store will hold a reference to the object so that it never falls out # of scope. weak_store = WeakLocal() -strong_store = corolocal.local +strong_store = threading.local() diff --git a/ceilometer/openstack/common/lockutils.py b/ceilometer/openstack/common/lockutils.py index 9fa3468ae..937027911 100644 --- a/ceilometer/openstack/common/lockutils.py +++ b/ceilometer/openstack/common/lockutils.py @@ -20,10 +20,11 @@ import contextlib import errno import functools import os +import threading import time import weakref -from eventlet import semaphore +import fixtures from oslo.config import cfg from ceilometer.openstack.common import fileutils @@ -137,7 +138,8 @@ _semaphores = weakref.WeakValueDictionary() def lock(name, lock_file_prefix=None, external=False, lock_path=None): """Context based lock - This function yields a `semaphore.Semaphore` instance unless external is + This function yields a `threading.Semaphore` instance (if we don't use + eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is True, in which case, it'll yield an InterProcessLock instance. :param lock_file_prefix: The lock_file_prefix argument is used to provide @@ -155,7 +157,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None): # NOTE(soren): If we ever go natively threaded, this will be racy. # See http://stackoverflow.com/questions/5390569/dyn # amically-allocating-and-destroying-mutexes - sem = _semaphores.get(name, semaphore.Semaphore()) + sem = _semaphores.get(name, threading.Semaphore()) if name not in _semaphores: # this check is not racy - we're already holding ref locally # so GC won't remove the item and there was no IO switch @@ -274,3 +276,36 @@ def synchronized_with_prefix(lock_file_prefix): """ return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) + + +class LockFixture(fixtures.Fixture): + """External locking fixture. + + This fixture is basically an alternative to the synchronized decorator with + the external flag so that tearDowns and addCleanups will be included in + the lock context for locking between tests. The fixture is recommended to + be the first line in a test method, like so:: + + def test_method(self): + self.useFixture(LockFixture) + ... + + or the first line in setUp if all the test methods in the class are + required to be serialized. Something like:: + + class TestCase(testtools.testcase): + def setUp(self): + self.useFixture(LockFixture) + super(TestCase, self).setUp() + ... + + This is because addCleanups are put on a LIFO queue that gets run after the + test method exits. (either by completing or raising an exception) + """ + def __init__(self, name, lock_file_prefix=None): + self.mgr = lock(name, lock_file_prefix, True) + + def setUp(self): + super(LockFixture, self).setUp() + self.addCleanup(self.mgr.__exit__, None, None, None) + self.mgr.__enter__() diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index b0a0a9d49..12659e64f 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -39,6 +39,7 @@ import sys import traceback from oslo.config import cfg +import six from six import moves from ceilometer.openstack.common.gettextutils import _ # noqa @@ -207,6 +208,8 @@ def _get_log_file_path(binary=None): binary = binary or _get_binary_name() return '%s.log' % (os.path.join(logdir, binary),) + return None + class BaseLoggerAdapter(logging.LoggerAdapter): @@ -249,6 +252,13 @@ class ContextAdapter(BaseLoggerAdapter): self.warn(stdmsg, *args, **kwargs) def process(self, msg, kwargs): + # NOTE(mrodden): catch any Message/other object and + # coerce to unicode before they can get + # to the python logging and possibly + # cause string encoding trouble + if not isinstance(msg, six.string_types): + msg = six.text_type(msg) + if 'extra' not in kwargs: kwargs['extra'] = {} extra = kwargs['extra'] @@ -260,14 +270,14 @@ class ContextAdapter(BaseLoggerAdapter): extra.update(_dictify_context(context)) instance = kwargs.pop('instance', None) + instance_uuid = (extra.get('instance_uuid', None) or + kwargs.pop('instance_uuid', None)) instance_extra = '' if instance: instance_extra = CONF.instance_format % instance - else: - instance_uuid = kwargs.pop('instance_uuid', None) - if instance_uuid: - instance_extra = (CONF.instance_uuid_format - % {'uuid': instance_uuid}) + elif instance_uuid: + instance_extra = (CONF.instance_uuid_format + % {'uuid': instance_uuid}) extra.update({'instance': instance_extra}) extra.update({"project": self.project}) diff --git a/ceilometer/openstack/common/middleware/audit.py b/ceilometer/openstack/common/middleware/audit.py index 1bda8d117..bb69e313a 100644 --- a/ceilometer/openstack/common/middleware/audit.py +++ b/ceilometer/openstack/common/middleware/audit.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2013 OpenStack LLC. +# Copyright (c) 2013 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/middleware/notifier.py b/ceilometer/openstack/common/middleware/notifier.py index ab744ff0e..183016da8 100644 --- a/ceilometer/openstack/common/middleware/notifier.py +++ b/ceilometer/openstack/common/middleware/notifier.py @@ -57,6 +57,8 @@ class RequestNotifier(base.Middleware): def __init__(self, app, **conf): self.service_name = conf.get('service_name', None) + self.ignore_req_list = [x.upper().strip() for x in + conf.get('ignore_req_list', '').split(',')] super(RequestNotifier, self).__init__(app) @staticmethod @@ -109,13 +111,16 @@ class RequestNotifier(base.Middleware): @webob.dec.wsgify def __call__(self, req): - self.process_request(req) - try: - response = req.get_response(self.application) - except Exception: - type, value, traceback = sys.exc_info() - self.process_response(req, None, value, traceback) - raise + if req.method in self.ignore_req_list: + return req.get_response(self.application) else: - self.process_response(req, response) - return response + self.process_request(req) + try: + response = req.get_response(self.application) + except Exception: + type, value, traceback = sys.exc_info() + self.process_response(req, None, value, traceback) + raise + else: + self.process_response(req, response) + return response diff --git a/ceilometer/openstack/common/notifier/log_notifier.py b/ceilometer/openstack/common/notifier/log_notifier.py index 0417e1fd9..99955a14d 100644 --- a/ceilometer/openstack/common/notifier/log_notifier.py +++ b/ceilometer/openstack/common/notifier/log_notifier.py @@ -25,7 +25,7 @@ CONF = cfg.CONF def notify(_context, message): """Notifies the recipient of the desired event given the model. - Log notifications using openstack's default logging system. + Log notifications using OpenStack's default logging system. """ priority = message.get('priority', diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py index 615427fd6..bccd6a516 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier.py @@ -24,7 +24,7 @@ LOG = logging.getLogger(__name__) notification_topic_opt = cfg.ListOpt( 'notification_topics', default=['notifications', ], - help='AMQP topic used for openstack notifications') + help='AMQP topic used for OpenStack notifications') CONF = cfg.CONF CONF.register_opt(notification_topic_opt) @@ -43,4 +43,5 @@ def notify(context, message): rpc.notify(context, topic, message) except Exception: LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) + "Payload=%(message)s"), + {"topic": topic, "message": message}) diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py index 114c66d16..449bea28c 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier2.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier2.py @@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__) notification_topic_opt = cfg.ListOpt( 'topics', default=['notifications', ], - help='AMQP topic(s) used for openstack notifications') + help='AMQP topic(s) used for OpenStack notifications') opt_group = cfg.OptGroup(name='rpc_notifier2', title='Options for rpc_notifier2') @@ -49,4 +49,5 @@ def notify(context, message): rpc.notify(context, topic, message, envelope=True) except Exception: LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) + "Payload=%(message)s"), + {"topic": topic, "message": message}) diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index bbcbe828f..c0168d8e2 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -221,7 +221,7 @@ class Enforcer(object): if policy_file: return policy_file - raise cfg.ConfigFilesNotFoundError((CONF.policy_file,)) + raise cfg.ConfigFilesNotFoundError((self.policy_file,)) def enforce(self, rule, target, creds, do_raise=False, exc=None, *args, **kwargs): @@ -279,11 +279,10 @@ class Enforcer(object): return result +@six.add_metaclass(abc.ABCMeta) class BaseCheck(object): """Abstract base class for Check classes.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def __str__(self): """String representation of the Check tree rooted at this node.""" @@ -449,7 +448,6 @@ class OrCheck(BaseCheck): for rule in self.rules: if rule(target, cred, enforcer): return True - return False def add_check(self, rule): @@ -627,6 +625,7 @@ def reducer(*tokens): return decorator +@six.add_metaclass(ParseStateMeta) class ParseState(object): """Implement the core of parsing the policy language. @@ -639,8 +638,6 @@ class ParseState(object): shouldn't be that big a problem. """ - __metaclass__ = ParseStateMeta - def __init__(self): """Initialize the ParseState.""" @@ -846,7 +843,13 @@ class GenericCheck(Check): """ # TODO(termie): do dict inspection via dot syntax - match = self.match % target + try: + match = self.match % target + except KeyError: + # While doing GenericCheck if key not + # present in Target return false + return False + if self.kind in creds: return match == six.text_type(creds[self.kind]) return False diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index 82cbdd351..e9dee1287 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -56,8 +56,7 @@ rpc_opts = [ help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['ceilometer.openstack.common.exception', - 'nova.exception', + default=['nova.exception', 'cinder.exception', 'exceptions', ], diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 0c9e5ec43..8e3c7b3c5 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -29,6 +29,7 @@ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import local from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import versionutils CONF = cfg.CONF @@ -441,19 +442,15 @@ def client_exceptions(*exceptions): return outer +# TODO(sirp): we should deprecate this in favor of +# using `versionutils.is_compatible` directly def version_is_compatible(imp_version, version): """Determine whether versions are compatible. :param imp_version: The version implemented :param version: The version requested by an incoming message. """ - version_parts = version.split('.') - imp_version_parts = imp_version.split('.') - if int(version_parts[0]) != int(imp_version_parts[0]): # Major - return False - if int(version_parts[1]) > int(imp_version_parts[1]): # Minor - return False - return True + return versionutils.is_compatible(version, imp_version) def serialize_msg(raw_msg): diff --git a/ceilometer/openstack/common/rpc/serializer.py b/ceilometer/openstack/common/rpc/serializer.py index 76c683103..5fd346d65 100644 --- a/ceilometer/openstack/common/rpc/serializer.py +++ b/ceilometer/openstack/common/rpc/serializer.py @@ -15,11 +15,12 @@ """Provides the definition of an RPC serialization handler""" import abc +import six +@six.add_metaclass(abc.ABCMeta) class Serializer(object): """Generic (de-)serialization definition base class.""" - __metaclass__ = abc.ABCMeta @abc.abstractmethod def serialize_entity(self, context, entity): diff --git a/ceilometer/openstack/common/service.py b/ceilometer/openstack/common/service.py index 0f6a9341d..574dd9096 100644 --- a/ceilometer/openstack/common/service.py +++ b/ceilometer/openstack/common/service.py @@ -221,6 +221,9 @@ class ProcessLauncher(object): status = None signo = 0 + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. try: launcher.wait() except SignalExit as exc: @@ -273,9 +276,6 @@ class ProcessLauncher(object): pid = os.fork() if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. launcher = self._child_process(wrap.service) while True: self._child_process_handle_signal() diff --git a/ceilometer/openstack/common/test.py b/ceilometer/openstack/common/test.py new file mode 100644 index 000000000..00088cee4 --- /dev/null +++ b/ceilometer/openstack/common/test.py @@ -0,0 +1,53 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010-2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Common utilities used in testing""" + +import os + +import fixtures +import testtools + + +class BaseTestCase(testtools.TestCase): + + def setUp(self): + super(BaseTestCase, self).setUp() + self._set_timeout() + self._fake_output() + self.useFixture(fixtures.FakeLogger('ceilometer.openstack.common')) + self.useFixture(fixtures.NestedTempfile()) + + def _set_timeout(self): + test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) + try: + test_timeout = int(test_timeout) + except ValueError: + # If timeout value is invalid do not set a timeout. + test_timeout = 0 + if test_timeout > 0: + self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) + + def _fake_output(self): + if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or + os.environ.get('OS_STDOUT_CAPTURE') == '1'): + stdout = self.useFixture(fixtures.StringStream('stdout')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) + if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or + os.environ.get('OS_STDERR_CAPTURE') == '1'): + stderr = self.useFixture(fixtures.StringStream('stderr')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index bd60489e5..98d877d59 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -21,6 +21,7 @@ Time related utilities and helper functions. import calendar import datetime +import time import iso8601 import six @@ -49,9 +50,9 @@ def parse_isotime(timestr): try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) except TypeError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) def strtime(at=None, fmt=PERFECT_TIME_FORMAT): @@ -90,6 +91,11 @@ def is_newer_than(after, seconds): def utcnow_ts(): """Timestamp version of our utcnow function.""" + if utcnow.override_time is None: + # NOTE(kgriffs): This is several times faster + # than going through calendar.timegm(...) + return int(time.time()) + return calendar.timegm(utcnow().timetuple()) @@ -111,12 +117,15 @@ def iso8601_from_timestamp(timestamp): utcnow.override_time = None -def set_time_override(override_time=datetime.datetime.utcnow()): +def set_time_override(override_time=None): """Overrides utils.utcnow. Make it return a constant time or a list thereof, one at a time. + + :param override_time: datetime instance or list thereof. If not + given, defaults to the current UTC time. """ - utcnow.override_time = override_time + utcnow.override_time = override_time or datetime.datetime.utcnow() def advance_time_delta(timedelta): diff --git a/ceilometer/openstack/common/versionutils.py b/ceilometer/openstack/common/versionutils.py new file mode 100644 index 000000000..f7b1f8a82 --- /dev/null +++ b/ceilometer/openstack/common/versionutils.py @@ -0,0 +1,45 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helpers for comparing version strings. +""" + +import pkg_resources + + +def is_compatible(requested_version, current_version, same_major=True): + """Determine whether `requested_version` is satisfied by + `current_version`; in other words, `current_version` is >= + `requested_version`. + + :param requested_version: version to check for compatibility + :param current_version: version to check against + :param same_major: if True, the major version must be identical between + `requested_version` and `current_version`. This is used when a + major-version difference indicates incompatibility between the two + versions. Since this is the common-case in practice, the default is + True. + :returns: True if compatible, False if not + """ + requested_parts = pkg_resources.parse_version(requested_version) + current_parts = pkg_resources.parse_version(current_version) + + if same_major and (requested_parts[0] != current_parts[0]): + return False + + return current_parts >= requested_parts diff --git a/ceilometer/openstack/common/xmlutils.py b/ceilometer/openstack/common/xmlutils.py new file mode 100644 index 000000000..b131d3e2e --- /dev/null +++ b/ceilometer/openstack/common/xmlutils.py @@ -0,0 +1,74 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from xml.dom import minidom +from xml.parsers import expat +from xml import sax +from xml.sax import expatreader + + +class ProtectedExpatParser(expatreader.ExpatParser): + """An expat parser which disables DTD's and entities by default.""" + + def __init__(self, forbid_dtd=True, forbid_entities=True, + *args, **kwargs): + # Python 2.x old style class + expatreader.ExpatParser.__init__(self, *args, **kwargs) + self.forbid_dtd = forbid_dtd + self.forbid_entities = forbid_entities + + def start_doctype_decl(self, name, sysid, pubid, has_internal_subset): + raise ValueError("Inline DTD forbidden") + + def entity_decl(self, entityName, is_parameter_entity, value, base, + systemId, publicId, notationName): + raise ValueError(" entity declaration forbidden") + + def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name): + # expat 1.2 + raise ValueError(" unparsed entity forbidden") + + def external_entity_ref(self, context, base, systemId, publicId): + raise ValueError(" external entity forbidden") + + def notation_decl(self, name, base, sysid, pubid): + raise ValueError(" notation forbidden") + + def reset(self): + expatreader.ExpatParser.reset(self) + if self.forbid_dtd: + self._parser.StartDoctypeDeclHandler = self.start_doctype_decl + self._parser.EndDoctypeDeclHandler = None + if self.forbid_entities: + self._parser.EntityDeclHandler = self.entity_decl + self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl + self._parser.ExternalEntityRefHandler = self.external_entity_ref + self._parser.NotationDeclHandler = self.notation_decl + try: + self._parser.SkippedEntityHandler = None + except AttributeError: + # some pyexpat versions do not support SkippedEntity + pass + + +def safe_minidom_parse_string(xml_string): + """Parse an XML string using minidom safely. + + """ + try: + return minidom.parseString(xml_string, parser=ProtectedExpatParser()) + except sax.SAXParseException: + raise expat.ExpatError() diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index e3b30980b..99c210531 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -117,6 +117,19 @@ #sqlite_synchronous=true +# +# Options defined in ceilometer.openstack.common.deprecated.wsgi +# + +# Number of backlog requests to configure the socket with +# (integer value) +#backlog=4096 + +# Sets the value of TCP_KEEPIDLE in seconds for each server +# socket. Not supported on OS X. (integer value) +#tcp_keepidle=600 + + # # Options defined in ceilometer.openstack.common.eventlet_backdoor # @@ -224,6 +237,15 @@ #syslog_log_facility=LOG_USER +# +# Options defined in ceilometer.openstack.common.middleware.sizelimit +# + +# the maximum body size per each request(bytes) (integer +# value) +#max_request_body_size=114688 + + # # Options defined in ceilometer.openstack.common.notifier.api # @@ -245,7 +267,7 @@ # Options defined in ceilometer.openstack.common.notifier.rpc_notifier # -# AMQP topic used for openstack notifications (list value) +# AMQP topic used for OpenStack notifications (list value) #notification_topics=notifications @@ -285,7 +307,7 @@ # Modules of exceptions that are permitted to be recreatedupon # receiving exception data from an rpc call. (list value) -#allowed_rpc_exception_modules=ceilometer.openstack.common.exception,nova.exception,cinder.exception,exceptions +#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions # If passed, use a fake RabbitMQ provider (boolean value) #fake_rabbit=false @@ -637,7 +659,7 @@ # Options defined in ceilometer.openstack.common.notifier.rpc_notifier2 # -# AMQP topic(s) used for openstack notifications (list value) +# AMQP topic(s) used for OpenStack notifications (list value) #topics=notifications