Sync db code from oslo-incubator

Synced from commit 5fb343faee1442921e2d610b1a5222e67418c4cd.
Due to changes in oslo.db API, this sync requires a bit more
work on Neutron side.

Sync includes the following commits:

5b7e61c Dispose db connections pool on disconnect
d1988b9 Set sql_mode callback on connect instead of checkout
a1a8280 Fix excessive logging from db.sqlalchemy.session
9933bdd Get mysql_sql_mode parameter from config
96a2217 Prevent incorrect usage of _wrap_db_error()
20a7510 Add from_config() method to EngineFacade
fea119e Drop special case for MySQL traditional mode, update unit tests
dda24eb Introduce mysql_sql_mode option, remove old warning
0b5af67 Introduce a method to set any MySQL session SQL mode
8dccc7b Handle ibm_db_sa DBDuplicateEntry integrity errors
5b9e9f4 Fix doc build errors in db.sqlalchemy
ac84a40 Update log translation domains
86707cd Remove None for dict.get()
0545121 Fix duplicating of SQL queries in logs
fcf517d Update oslo log messages with translation domains
630d395 Don't use cfg.CONF in oslo.db
ce69e7f Don't store engine instances in oslo.db

Change-Id: I0e1d86878d3eb924b01e04dced0f90b4e57757d8
This commit is contained in:
Roman Podoliaka 2014-04-08 11:47:48 +03:00
parent 5966e13dd3
commit 53609f29f3
10 changed files with 1009 additions and 377 deletions

View File

@ -24,7 +24,7 @@ from paste import deploy
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import utils from neutron.common import utils
from neutron.openstack.common.db.sqlalchemy import session as db_session from neutron.openstack.common.db import options as db_options
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc from neutron.openstack.common import rpc
from neutron.version import version_info as neutron_version from neutron.version import version_info as neutron_version
@ -129,7 +129,7 @@ rpc.set_defaults(control_exchange='neutron')
_SQL_CONNECTION_DEFAULT = 'sqlite://' _SQL_CONNECTION_DEFAULT = 'sqlite://'
# Update the default QueuePool parameters. These can be tweaked by the # Update the default QueuePool parameters. These can be tweaked by the
# configuration variables - max_pool_size, max_overflow and pool_timeout # configuration variables - max_pool_size, max_overflow and pool_timeout
db_session.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT, db_options.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT,
sqlite_db='', max_pool_size=10, sqlite_db='', max_pool_size=10,
max_overflow=20, pool_timeout=10) max_overflow=20, pool_timeout=10)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo.config import cfg
import sqlalchemy as sql import sqlalchemy as sql
from neutron.db import model_base from neutron.db import model_base
@ -23,6 +24,22 @@ LOG = logging.getLogger(__name__)
BASE = model_base.BASEV2 BASE = model_base.BASEV2
cfg.CONF.import_opt('connection',
'neutron.openstack.common.db.options',
group='database')
_FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = session.EngineFacade.from_config(
cfg.CONF.database.connection, cfg.CONF, sqlite_fk=True)
return _FACADE
def configure_db(): def configure_db():
"""Configure database. """Configure database.
@ -30,26 +47,31 @@ def configure_db():
Establish the database, create an engine if needed, and register Establish the database, create an engine if needed, and register
the models. the models.
""" """
session.get_engine(sqlite_fk=True)
register_models() register_models()
def clear_db(base=BASE): def clear_db(base=BASE):
unregister_models(base) unregister_models(base)
session.cleanup()
def get_engine():
"""Helper method to grab engine."""
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(autocommit=True, expire_on_commit=False): def get_session(autocommit=True, expire_on_commit=False):
"""Helper method to grab session.""" """Helper method to grab session."""
return session.get_session(autocommit=autocommit, facade = _create_facade_lazily()
expire_on_commit=expire_on_commit, return facade.get_session(autocommit=autocommit,
sqlite_fk=True) expire_on_commit=expire_on_commit)
def register_models(base=BASE): def register_models(base=BASE):
"""Register Models and create properties.""" """Register Models and create properties."""
try: try:
engine = session.get_engine(sqlite_fk=True) facade = _create_facade_lazily()
engine = facade.get_engine()
base.metadata.create_all(engine) base.metadata.create_all(engine)
except sql.exc.OperationalError as e: except sql.exc.OperationalError as e:
LOG.info(_("Database registration exception: %s"), e) LOG.info(_("Database registration exception: %s"), e)
@ -60,7 +82,8 @@ def register_models(base=BASE):
def unregister_models(base=BASE): def unregister_models(base=BASE):
"""Unregister Models, useful clearing out data before testing.""" """Unregister Models, useful clearing out data before testing."""
try: try:
engine = session.get_engine(sqlite_fk=True) facade = _create_facade_lazily()
engine = facade.get_engine()
base.metadata.drop_all(engine) base.metadata.drop_all(engine)
except Exception: except Exception:
LOG.exception(_("Database exception")) LOG.exception(_("Database exception"))

View File

@ -15,11 +15,6 @@
"""Multiple DB API backend support. """Multiple DB API backend support.
Supported configuration options:
The following two parameters are in the 'database' group:
`backend`: DB backend name or full module path to DB backend module.
A DB backend module should implement a method named 'get_backend' which A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB takes no arguments. The method can return any object that implements DB
API methods. API methods.
@ -27,45 +22,14 @@ API methods.
import functools import functools
import logging import logging
import threading
import time import time
from oslo.config import cfg
from neutron.openstack.common.db import exception from neutron.openstack.common.db import exception
from neutron.openstack.common.gettextutils import _ # noqa from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
db_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.BoolOpt('use_db_reconnect',
default=False,
help='Enable the experimental use of database reconnect '
'on connection lost'),
cfg.IntOpt('db_retry_interval',
default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
]
CONF = cfg.CONF
CONF.register_opts(db_opts, 'database')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -75,7 +39,7 @@ def safe_for_db_retry(f):
return f return f
def _wrap_db_retry(f): class wrap_db_retry(object):
"""Retry db.api methods, if DBConnectionError() raised """Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect` Retry decorated db.api methods. If we enabled `use_db_reconnect`
@ -84,53 +48,115 @@ def _wrap_db_retry(f):
Decorator catchs DBConnectionError() and retries function in a Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached. loop until it succeeds, or until maximum retries count will be reached.
""" """
def __init__(self, retry_interval, max_retries, inc_retry_interval,
max_retry_interval):
super(wrap_db_retry, self).__init__()
self.retry_interval = retry_interval
self.max_retries = max_retries
self.inc_retry_interval = inc_retry_interval
self.max_retry_interval = max_retry_interval
def __call__(self, f):
@functools.wraps(f) @functools.wraps(f)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
next_interval = CONF.database.db_retry_interval next_interval = self.retry_interval
remaining = CONF.database.db_max_retries remaining = self.max_retries
while True: while True:
try: try:
return f(*args, **kwargs) return f(*args, **kwargs)
except exception.DBConnectionError as e: except exception.DBConnectionError as e:
if remaining == 0: if remaining == 0:
LOG.exception(_('DB exceeded retry limit.')) LOG.exception(_LE('DB exceeded retry limit.'))
raise exception.DBError(e) raise exception.DBError(e)
if remaining != -1: if remaining != -1:
remaining -= 1 remaining -= 1
LOG.exception(_('DB connection error.')) LOG.exception(_LE('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so this # NOTE(vsergeyev): We are using patched time module, so
# effectively yields the execution context to # this effectively yields the execution
# another green thread. # context to another green thread.
time.sleep(next_interval) time.sleep(next_interval)
if CONF.database.db_inc_retry_interval: if self.inc_retry_interval:
next_interval = min( next_interval = min(
next_interval * 2, next_interval * 2,
CONF.database.db_max_retry_interval self.max_retry_interval
) )
return wrapper return wrapper
class DBAPI(object): class DBAPI(object):
def __init__(self, backend_mapping=None): def __init__(self, backend_name, backend_mapping=None, lazy=False,
if backend_mapping is None: **kwargs):
backend_mapping = {} """Initialize the chosen DB API backend.
backend_name = CONF.database.backend
# Import the untranslated name if we don't have a :param backend_name: name of the backend to load
# mapping. :type backend_name: str
backend_path = backend_mapping.get(backend_name, backend_name)
:param backend_mapping: backend name -> module/class to load mapping
:type backend_mapping: dict
:param lazy: load the DB backend lazily on the first DB API method call
:type lazy: bool
Keyword arguments:
:keyword use_db_reconnect: retry DB transactions on disconnect or not
:type use_db_reconnect: bool
:keyword retry_interval: seconds between transaction retries
:type retry_interval: int
:keyword inc_retry_interval: increase retry interval or not
:type inc_retry_interval: bool
:keyword max_retry_interval: max interval value between retries
:type max_retry_interval: int
:keyword max_retries: max number of retries before an error is raised
:type max_retries: int
"""
self._backend = None
self._backend_name = backend_name
self._backend_mapping = backend_mapping or {}
self._lock = threading.Lock()
if not lazy:
self._load_backend()
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
self.retry_interval = kwargs.get('retry_interval', 1)
self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
self.max_retries = kwargs.get('max_retries', 20)
def _load_backend(self):
with self._lock:
if not self._backend:
# Import the untranslated name if we don't have a mapping
backend_path = self._backend_mapping.get(self._backend_name,
self._backend_name)
backend_mod = importutils.import_module(backend_path) backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend() self._backend = backend_mod.get_backend()
def __getattr__(self, key): def __getattr__(self, key):
attr = getattr(self.__backend, key) if not self._backend:
self._load_backend()
attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'): if not hasattr(attr, '__call__'):
return attr return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry # DB API methods, decorated with @safe_for_db_retry
# on disconnect. # on disconnect.
if CONF.database.use_db_reconnect and hasattr(attr, 'enable_retry'): if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
attr = _wrap_db_retry(attr) attr = wrap_db_retry(
retry_interval=self.retry_interval,
max_retries=self.max_retries,
inc_retry_interval=self.inc_retry_interval,
max_retry_interval=self.max_retry_interval)(attr)
return attr return attr

View File

@ -0,0 +1,171 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo.config import cfg
database_opts = [
cfg.StrOpt('sqlite_db',
deprecated_group='DEFAULT',
default='neutron.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
deprecated_group='DEFAULT',
default=True,
help='If True, SQLite uses synchronous mode'),
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.StrOpt('connection',
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
cfg.BoolOpt('use_db_reconnect',
default=False,
help='Enable the experimental use of database reconnect '
'on connection lost'),
cfg.IntOpt('db_retry_interval',
default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
]
CONF = cfg.CONF
CONF.register_opts(database_opts, 'database')
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
cfg.set_defaults(database_opts,
max_pool_size=max_pool_size)
if max_overflow is not None:
cfg.set_defaults(database_opts,
max_overflow=max_overflow)
if pool_timeout is not None:
cfg.set_defaults(database_opts,
pool_timeout=pool_timeout)
def list_opts():
"""Returns a list of oslo.config options available in the library.
The returned list includes all oslo.config options which may be registered
at runtime by the library.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users by this library.
:returns: a list of (group_name, opts) tuples
"""
return [('database', copy.deepcopy(database_opts))]

View File

@ -26,18 +26,16 @@ from sqlalchemy import Column, Integer
from sqlalchemy import DateTime from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper from sqlalchemy.orm import object_mapper
from neutron.openstack.common.db.sqlalchemy import session as sa
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
class ModelBase(object): class ModelBase(six.Iterator):
"""Base class for models.""" """Base class for models."""
__table_initialized__ = False __table_initialized__ = False
def save(self, session=None): def save(self, session):
"""Save this object.""" """Save this object."""
if not session:
session = sa.get_session()
# NOTE(boris-42): This part of code should be look like: # NOTE(boris-42): This part of code should be look like:
# session.add(self) # session.add(self)
# session.flush() # session.flush()
@ -80,10 +78,14 @@ class ModelBase(object):
self._i = iter(columns) self._i = iter(columns)
return self return self
def next(self): # In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i) n = six.advance_iterator(self._i)
return n, getattr(self, n) return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values): def update(self, values):
"""Make the model object behave like a dict.""" """Make the model object behave like a dict."""
for k, v in six.iteritems(values): for k, v in six.iteritems(values):
@ -110,7 +112,7 @@ class SoftDeleteMixin(object):
deleted_at = Column(DateTime) deleted_at = Column(DateTime)
deleted = Column(Integer, default=0) deleted = Column(Integer, default=0)
def soft_delete(self, session=None): def soft_delete(self, session):
"""Mark this object as deleted.""" """Mark this object as deleted."""
self.deleted = self.id self.deleted = self.id
self.deleted_at = timeutils.utcnow() self.deleted_at = timeutils.utcnow()

View File

@ -0,0 +1,157 @@
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
from six import moves
import sqlalchemy
from neutron.openstack.common.db import exception as exc
LOG = logging.getLogger(__name__)
def get_engine(uri):
"""Engine creation
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
if driver == 'sqlite':
try:
os.remove(auth['database'])
except OSError:
pass
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
def main():
"""Controller to handle commands
::create: Create test user and database with random names.
::drop: Drop user and database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test '
'databases and users.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
type=int,
help='Number of databases to create.')
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases and users.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
args = parser.parse_args()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
if __name__ == "__main__":
main()

View File

@ -16,19 +16,6 @@
"""Session Handling for SQLAlchemy backend. """Session Handling for SQLAlchemy backend.
Initializing:
* Call `set_defaults()` with the minimal of the following kwargs:
``sql_connection``, ``sqlite_db``
Example:
.. code:: python
session.set_defaults(
sql_connection="sqlite:///var/lib/neutron/sqlite.db",
sqlite_db="/var/lib/neutron/sqlite.db")
Recommended ways to use sessions within this framework: Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. * Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
@ -87,7 +74,7 @@ Recommended ways to use sessions within this framework:
.. code:: python .. code:: python
def create_many_foo(context, foos): def create_many_foo(context, foos):
session = get_session() session = sessionmaker()
with session.begin(): with session.begin():
for foo in foos: for foo in foos:
foo_ref = models.Foo() foo_ref = models.Foo()
@ -95,7 +82,7 @@ Recommended ways to use sessions within this framework:
session.add(foo_ref) session.add(foo_ref)
def update_bar(context, foo_id, newbar): def update_bar(context, foo_id, newbar):
session = get_session() session = sessionmaker()
with session.begin(): with session.begin():
foo_ref = (model_query(context, models.Foo, session). foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id). filter_by(id=foo_id).
@ -142,7 +129,7 @@ Recommended ways to use sessions within this framework:
foo1 = models.Foo() foo1 = models.Foo()
foo2 = models.Foo() foo2 = models.Foo()
foo1.id = foo2.id = 1 foo1.id = foo2.id = 1
session = get_session() session = sessionmaker()
try: try:
with session.begin(): with session.begin():
session.add(foo1) session.add(foo1)
@ -168,7 +155,7 @@ Recommended ways to use sessions within this framework:
.. code:: python .. code:: python
def myfunc(foo): def myfunc(foo):
session = get_session() session = sessionmaker()
with session.begin(): with session.begin():
# do some database things # do some database things
bar = _private_func(foo, session) bar = _private_func(foo, session)
@ -176,7 +163,7 @@ Recommended ways to use sessions within this framework:
def _private_func(foo, session=None): def _private_func(foo, session=None):
if not session: if not session:
session = get_session() session = sessionmaker()
with session.begin(subtransaction=True): with session.begin(subtransaction=True):
# do some other database things # do some other database things
return bar return bar
@ -240,7 +227,7 @@ Efficient use of soft deletes:
def complex_soft_delete_with_synchronization_bar(session=None): def complex_soft_delete_with_synchronization_bar(session=None):
if session is None: if session is None:
session = get_session() session = sessionmaker()
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
count = (model_query(BarModel). count = (model_query(BarModel).
find(some_condition). find(some_condition).
@ -257,7 +244,7 @@ Efficient use of soft deletes:
.. code:: python .. code:: python
def soft_delete_bar_model(): def soft_delete_bar_model():
session = get_session() session = sessionmaker()
with session.begin(): with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first() bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref # Work with bar_ref
@ -269,7 +256,7 @@ Efficient use of soft deletes:
.. code:: python .. code:: python
def soft_delete_multi_models(): def soft_delete_multi_models():
session = get_session() session = sessionmaker()
with session.begin(): with session.begin():
query = (model_query(BarModel, session=session). query = (model_query(BarModel, session=session).
find(some_condition)) find(some_condition))
@ -293,11 +280,9 @@ Efficient use of soft deletes:
import functools import functools
import logging import logging
import os.path
import re import re
import time import time
from oslo.config import cfg
import six import six
from sqlalchemy import exc as sqla_exc from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener from sqlalchemy.interfaces import PoolListener
@ -306,150 +291,12 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import literal_column
from neutron.openstack.common.db import exception from neutron.openstack.common.db import exception
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _LE, _LW
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
sqlite_db_opts = [
cfg.StrOpt('sqlite_db',
default='neutron.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If True, SQLite uses synchronous mode'),
]
database_opts = [
cfg.StrOpt('connection',
default='sqlite:///' +
os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection',
default='',
secret=True,
help='The SQLAlchemy connection string used to connect to the '
'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
]
CONF = cfg.CONF
CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_ENGINE = None
_MAKER = None
_SLAVE_ENGINE = None
_SLAVE_MAKER = None
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection)
cfg.set_defaults(sqlite_db_opts,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
cfg.set_defaults(database_opts,
max_pool_size=max_pool_size)
if max_overflow is not None:
cfg.set_defaults(database_opts,
max_overflow=max_overflow)
if pool_timeout is not None:
cfg.set_defaults(database_opts,
pool_timeout=pool_timeout)
def cleanup():
global _ENGINE, _MAKER
global _SLAVE_ENGINE, _SLAVE_MAKER
if _MAKER:
_MAKER.close_all()
_MAKER = None
if _ENGINE:
_ENGINE.dispose()
_ENGINE = None
if _SLAVE_MAKER:
_SLAVE_MAKER.close_all()
_SLAVE_MAKER = None
if _SLAVE_ENGINE:
_SLAVE_ENGINE.dispose()
_SLAVE_ENGINE = None
class SqliteForeignKeysListener(PoolListener): class SqliteForeignKeysListener(PoolListener):
"""Ensures that the foreign key constraints are enforced in SQLite. """Ensures that the foreign key constraints are enforced in SQLite.
@ -462,30 +309,6 @@ class SqliteForeignKeysListener(PoolListener):
dbapi_con.execute('pragma foreign_keys=ON') dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
slave_session=False, mysql_traditional_mode=False):
"""Return a SQLAlchemy session."""
global _MAKER
global _SLAVE_MAKER
maker = _MAKER
if slave_session:
maker = _SLAVE_MAKER
if maker is None:
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
mysql_traditional_mode=mysql_traditional_mode)
maker = get_maker(engine, autocommit, expire_on_commit)
if slave_session:
_SLAVE_MAKER = maker
else:
_MAKER = maker
session = maker()
return session
# note(boris-42): In current versions of DB backends unique constraint # note(boris-42): In current versions of DB backends unique constraint
# violation messages follow the structure: # violation messages follow the structure:
# #
@ -509,11 +332,20 @@ def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
# 'c1'") # 'c1'")
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'") # with -' for key 'name_of_our_constraint'")
#
# ibm_db_sa:
# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
# statement, UPDATE statement, or foreign key update caused by a
# DELETE statement are not valid because the primary key, unique
# constraint or unique index identified by "2" constrains table
# "NOVA.KEY_PAIRS" from having duplicate values for the index
# key.
_DUP_KEY_RE_DB = { _DUP_KEY_RE_DB = {
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),) "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
"ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
} }
@ -535,7 +367,7 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
return [columns] return [columns]
return columns[len(uniqbase):].split("0")[1:] return columns[len(uniqbase):].split("0")[1:]
if engine_name not in ["mysql", "sqlite", "postgresql"]: if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
return return
# FIXME(johannes): The usage of the .message attribute has been # FIXME(johannes): The usage of the .message attribute has been
@ -550,6 +382,11 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
else: else:
return return
# NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
# columns so we have to omit that from the DBDuplicateEntry error.
columns = ''
if engine_name != 'ibm_db_sa':
columns = match.group(1) columns = match.group(1)
if engine_name == "sqlite": if engine_name == "sqlite":
@ -592,14 +429,19 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f): def _wrap_db_error(f):
@functools.wraps(f) @functools.wraps(f)
def _wrap(*args, **kwargs): def _wrap(self, *args, **kwargs):
try: try:
return f(*args, **kwargs) assert issubclass(
self.__class__, sqlalchemy.orm.session.Session
), ('_wrap_db_error() can only be applied to methods of '
'subclasses of sqlalchemy.orm.session.Session.')
return f(self, *args, **kwargs)
except UnicodeEncodeError: except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter() raise exception.DBInvalidUnicodeParameter()
except sqla_exc.OperationalError as e: except sqla_exc.OperationalError as e:
_raise_if_db_connection_lost(e, get_engine()) _raise_if_db_connection_lost(e, self.bind)
_raise_if_deadlock_error(e, get_engine().name) _raise_if_deadlock_error(e, self.bind.dialect.name)
# NOTE(comstud): A lot of code is checking for OperationalError # NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now. # so let's not wrap it for now.
raise raise
@ -612,37 +454,14 @@ def _wrap_db_error(f):
# instance_types) there are more than one unique constraint. This # instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate # means we should get names of columns, which values violate
# unique constraint, from error message. # unique constraint, from error message.
_raise_if_duplicate_entry_error(e, get_engine().name) _raise_if_duplicate_entry_error(e, self.bind.dialect.name)
raise exception.DBError(e) raise exception.DBError(e)
except Exception as e: except Exception as e:
LOG.exception(_('DB exception wrapped.')) LOG.exception(_LE('DB exception wrapped.'))
raise exception.DBError(e) raise exception.DBError(e)
return _wrap return _wrap
def get_engine(sqlite_fk=False, slave_engine=False,
mysql_traditional_mode=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
global _SLAVE_ENGINE
engine = _ENGINE
db_uri = CONF.database.connection
if slave_engine:
engine = _SLAVE_ENGINE
db_uri = CONF.database.slave_connection
if engine is None:
engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
mysql_traditional_mode=mysql_traditional_mode)
if slave_engine:
_SLAVE_ENGINE = engine
else:
_ENGINE = engine
return engine
def _synchronous_switch_listener(dbapi_conn, connection_rec): def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode.""" """Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF") dbapi_conn.execute("PRAGMA synchronous = OFF")
@ -684,22 +503,78 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
cursor.execute(ping_sql) cursor.execute(ping_sql)
except Exception as ex: except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg) LOG.warning(msg)
# if the database server has gone away, all connections in the pool
# have become invalid and we can safely close all of them here,
# rather than waste time on checking of every single connection
engine.dispose()
# this will be handled by SQLAlchemy and will force it to create
# a new connection and retry the original action
raise sqla_exc.DisconnectionError(msg) raise sqla_exc.DisconnectionError(msg)
else: else:
raise raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy): def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
"""Set engine mode to 'traditional'. """Set the sql_mode session variable.
Required to prevent silent truncates at insert or update operations MySQL supports several server modes. The default is None, but sessions
under MySQL. By default MySQL truncates inserted string if it longer may choose to enable server modes like TRADITIONAL, ANSI,
than a declared field just with warning. That is fraught with data several STRICT_* modes and others.
corruption.
Note: passing in '' (empty string) for sql_mode clears
the SQL mode for the session, overriding a potentially set
server default.
""" """
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
def _mysql_get_effective_sql_mode(engine):
"""Returns the effective SQL mode for connections from the engine pool.
Returns ``None`` if the mode isn't available, otherwise returns the mode.
"""
# Get the real effective SQL mode. Even when unset by
# our own config, the server may still be operating in a specific
# SQL mode as set by the server configuration.
# Also note that the checkout listener will be called on execute to
# set the mode if it's registered.
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
if row is None:
return
return row[1]
def _mysql_check_effective_sql_mode(engine):
"""Logs a message based on the effective SQL mode for MySQL connections."""
realmode = _mysql_get_effective_sql_mode(engine)
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
return
LOG.debug('MySQL server mode set to %s', realmode)
# 'TRADITIONAL' mode enables several other modes, so
# we need a substring match here
if not ('TRADITIONAL' in realmode.upper() or
'STRICT_ALL_TABLES' in realmode.upper()):
LOG.warning(_LW("MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
def _mysql_set_mode_callback(engine, sql_mode):
if sql_mode is not None:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=sql_mode)
sqlalchemy.event.listen(engine, 'connect', mode_callback)
_mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args): def _is_db_connection_error(args):
@ -726,69 +601,63 @@ def _raise_if_db_connection_lost(error, engine):
raise exception.DBConnectionError(error) raise exception.DBConnectionError(error)
def create_engine(sql_connection, sqlite_fk=False, def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
mysql_traditional_mode=False): idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10):
"""Return a new SQLAlchemy engine.""" """Return a new SQLAlchemy engine."""
# NOTE(geekinutah): At this point we could be connecting to the normal
# db handle or the slave db handle. Things like
# _wrap_db_error aren't going to work well if their
# backends don't match. Let's check.
_assert_matching_drivers()
connection_dict = sqlalchemy.engine.url.make_url(sql_connection) connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = { engine_args = {
"pool_recycle": CONF.database.idle_timeout, "pool_recycle": idle_timeout,
"echo": False,
'convert_unicode': True, 'convert_unicode': True,
} }
# Map our SQL debug level to SQLAlchemy's options logger = logging.getLogger('sqlalchemy.engine')
if CONF.database.connection_debug >= 100:
engine_args['echo'] = 'debug' # Map SQL debug level to Python log level
elif CONF.database.connection_debug >= 50: if connection_debug >= 100:
engine_args['echo'] = True logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
if "sqlite" in connection_dict.drivername: if "sqlite" in connection_dict.drivername:
if sqlite_fk: if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()] engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool engine_args["poolclass"] = NullPool
if CONF.database.connection == "sqlite://": if sql_connection == "sqlite://":
engine_args["poolclass"] = StaticPool engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False} engine_args["connect_args"] = {'check_same_thread': False}
else: else:
if CONF.database.max_pool_size is not None: if max_pool_size is not None:
engine_args['pool_size'] = CONF.database.max_pool_size engine_args['pool_size'] = max_pool_size
if CONF.database.max_overflow is not None: if max_overflow is not None:
engine_args['max_overflow'] = CONF.database.max_overflow engine_args['max_overflow'] = max_overflow
if CONF.database.pool_timeout is not None: if pool_timeout is not None:
engine_args['pool_timeout'] = CONF.database.pool_timeout engine_args['pool_timeout'] = pool_timeout
engine = sqlalchemy.create_engine(sql_connection, **engine_args) engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield) sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if engine.name in ['mysql', 'ibm_db_sa']: if engine.name in ['mysql', 'ibm_db_sa']:
callback = functools.partial(_ping_listener, engine) ping_callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback) sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql': if engine.name == 'mysql':
if mysql_traditional_mode: if mysql_sql_mode:
sqlalchemy.event.listen(engine, 'checkout', _mysql_set_mode_callback(engine, mysql_sql_mode)
_set_mode_traditional)
else:
LOG.warning(_("This application has not enabled MySQL "
"traditional mode, which means silent "
"data corruption may occur. "
"Please encourage the application "
"developers to enable this mode."))
elif 'sqlite' in connection_dict.drivername: elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous: if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect', sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener) _synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if (CONF.database.connection_trace and if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
engine.dialect.dbapi.__name__ == 'MySQLdb'):
_patch_mysqldb_with_stacktrace_comments() _patch_mysqldb_with_stacktrace_comments()
try: try:
@ -797,15 +666,15 @@ def create_engine(sql_connection, sqlite_fk=False,
if not _is_db_connection_error(e.args[0]): if not _is_db_connection_error(e.args[0]):
raise raise
remaining = CONF.database.max_retries remaining = max_retries
if remaining == -1: if remaining == -1:
remaining = 'infinite' remaining = 'infinite'
while True: while True:
msg = _('SQL connection failed. %s attempts left.') msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg % remaining) LOG.warning(msg % remaining)
if remaining != 'infinite': if remaining != 'infinite':
remaining -= 1 remaining -= 1
time.sleep(CONF.database.retry_interval) time.sleep(retry_interval)
try: try:
engine.connect() engine.connect()
break break
@ -892,13 +761,144 @@ def _patch_mysqldb_with_stacktrace_comments():
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
def _assert_matching_drivers(): class EngineFacade(object):
"""Make sure slave handle and normal handle have the same driver.""" """A helper class for removing of global engine instances from neutron.db.
# NOTE(geekinutah): There's no use case for writing to one backend and
# reading from another. Who knows what the future holds?
if CONF.database.slave_connection == '':
return
normal = sqlalchemy.engine.url.make_url(CONF.database.connection) As a library, neutron.db can't decide where to store/when to create engine
slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection) and sessionmaker instances, so this must be left for a target application.
assert normal.drivername == slave.drivername
On the other hand, in order to simplify the adoption of neutron.db changes,
we'll provide a helper class, which creates engine and sessionmaker
on its instantiation and provides get_engine()/get_session() methods
that are compatible with corresponding utility functions that currently
exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to
be global), but they will be stored in the app context, rather that in the
neutron.db context.
Note: using of this helper is completely optional and you are encouraged to
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection,
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
0=None, 100=Everything (defaults to 0)
:keyword max_pool_size: maximum number of SQL connections to keep open
in a pool (defaults to SQLAlchemy settings)
:keyword max_overflow: if set, use this value for max_overflow with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword pool_timeout: if set, use this value for pool_timeout with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
(defaults to True)
:keyword connection_trace: add python stack traces to SQL as comment
strings (defaults to False)
:keyword max_retries: maximum db connection retries during startup.
(setting -1 implies an infinite retry count)
(defaults to 10)
:keyword retry_interval: interval between retries of opening a sql
connection (defaults to 10)
"""
super(EngineFacade, self).__init__()
self._engine = create_engine(
sql_connection=sql_connection,
sqlite_fk=sqlite_fk,
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
idle_timeout=kwargs.get('idle_timeout', 3600),
connection_debug=kwargs.get('connection_debug', 0),
max_pool_size=kwargs.get('max_pool_size'),
max_overflow=kwargs.get('max_overflow'),
pool_timeout=kwargs.get('pool_timeout'),
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
connection_trace=kwargs.get('connection_trace', False),
max_retries=kwargs.get('max_retries', 10),
retry_interval=kwargs.get('retry_interval', 10))
self._session_maker = get_maker(
engine=self._engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
def get_engine(self):
"""Get the engine instance (note, that it's shared)."""
return self._engine
def get_session(self, **kwargs):
"""Get a Session instance.
If passed, keyword arguments values override the ones used when the
sessionmaker instance was created.
:keyword autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:keyword expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
for arg in kwargs:
if arg not in ('autocommit', 'expire_on_commit'):
del kwargs[arg]
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, connection_string, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param connection_string: SQLAlchemy connection string
:type connection_string: string
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(sql_connection=connection_string,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
**dict(conf.database.items()))

View File

@ -0,0 +1,153 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import functools
import os
import fixtures
import six
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common.db.sqlalchemy import utils
from neutron.openstack.common.fixture import lockutils
from neutron.openstack.common import test
class DbFixture(fixtures.Fixture):
"""Basic database fixture.
Allows to run tests on various db backends, such as SQLite, MySQL and
PostgreSQL. By default use sqlite backend. To override default backend
uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
credentials for specific backend.
"""
def _get_uri(self):
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
def __init__(self, test):
super(DbFixture, self).__init__()
self.test = test
def setUp(self):
super(DbFixture, self).setUp()
self.test.engine = session.create_engine(self._get_uri())
self.test.sessionmaker = session.get_maker(self.test.engine)
self.addCleanup(self.test.engine.dispose)
class DbTestCase(test.BaseTestCase):
"""Base class for testing of DB code.
Using `DbFixture`. Intended to be the main database test case to use all
the tests on a given backend with user defined uri. Backend specific
tests should be decorated with `backend_specific` decorator.
"""
FIXTURE = DbFixture
def setUp(self):
super(DbTestCase, self).setUp()
self.useFixture(self.FIXTURE(self))
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
def backend_specific(*dialects):
"""Decorator to skip backend specific tests on inappropriate engines.
::dialects: list of dialects names under which the test will be launched.
"""
def wrap(f):
@functools.wraps(f)
def ins_wrap(self):
if not set(dialects).issubset(ALLOWED_DIALECTS):
raise ValueError(
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
if self.engine.name not in dialects:
msg = ('The test "%s" can be run '
'only on %s. Current engine is %s.')
args = (f.__name__, ' '.join(dialects), self.engine.name)
self.skip(msg % args)
else:
return f(self)
return ins_wrap
return wrap
@six.add_metaclass(abc.ABCMeta)
class OpportunisticFixture(DbFixture):
"""Base fixture to use default CI databases.
The databases exist in OpenStack CI infrastructure. But for the
correct functioning in local environment the databases must be
created manually.
"""
DRIVER = abc.abstractproperty(lambda: None)
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
def _get_uri(self):
return utils.get_connect_string(backend=self.DRIVER,
user=self.USERNAME,
passwd=self.PASSWORD,
database=self.DBNAME)
@six.add_metaclass(abc.ABCMeta)
class OpportunisticTestCase(DbTestCase):
"""Base test case to use default CI databases.
The subclasses of the test case are running only when openstack_citest
database is available otherwise a tests will be skipped.
"""
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
# TODO(bnemec): Remove this once infra is ready for
# https://review.openstack.org/#/c/74963/ to merge.
self.useFixture(lockutils.LockFixture('opportunistic-db'))
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,
'passwd': self.FIXTURE.PASSWORD,
'database': self.FIXTURE.DBNAME}
if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
msg = '%s backend is not available.' % self.FIXTURE.DRIVER
return self.skip(msg)
super(OpportunisticTestCase, self).setUp()
class MySQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'mysql'
class PostgreSQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'postgresql'
class MySQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = MySQLOpportunisticFixture
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = PostgreSQLOpportunisticFixture

View File

@ -19,7 +19,6 @@
import logging import logging
import re import re
from migrate.changeset import UniqueConstraint
import sqlalchemy import sqlalchemy
from sqlalchemy import Boolean from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint from sqlalchemy import CheckConstraint
@ -30,14 +29,16 @@ from sqlalchemy import func
from sqlalchemy import Index from sqlalchemy import Index
from sqlalchemy import Integer from sqlalchemy import Integer
from sqlalchemy import MetaData from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String from sqlalchemy import String
from sqlalchemy import Table from sqlalchemy import Table
from sqlalchemy.types import NullType from sqlalchemy.types import NullType
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import context as request_context
from neutron.openstack.common.db.sqlalchemy import models
from neutron.openstack.common.gettextutils import _, _LI, _LW
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
@ -93,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
if 'id' not in sort_keys: if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check # TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id # the actual primary key, rather than assuming its id
LOG.warning(_('Id not in sort_keys; is sort_keys unique?')) LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs)) assert(not (sort_dir and sort_dirs))
@ -156,6 +157,98 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
return query return query
def _read_deleted_filter(query, db_model, read_deleted):
if 'deleted' not in db_model.__table__.columns:
raise ValueError(_("There is no `deleted` column in `%s` table. "
"Project doesn't use soft-deleted feature.")
% db_model.__name__)
default_deleted_value = db_model.__table__.c.deleted.default.arg
if read_deleted == 'no':
query = query.filter(db_model.deleted == default_deleted_value)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter(db_model.deleted != default_deleted_value)
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
return query
def _project_filter(query, db_model, context, project_only):
if project_only and 'project_id' not in db_model.__table__.columns:
raise ValueError(_("There is no `project_id` column in `%s` table.")
% db_model.__name__)
if request_context.is_user_context(context) and project_only:
if project_only == 'allow_none':
is_none = None
query = query.filter(or_(db_model.project_id == context.project_id,
db_model.project_id == is_none))
else:
query = query.filter(db_model.project_id == context.project_id)
return query
def model_query(context, model, session, args=None, project_only=False,
read_deleted=None):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: Model to query. Must be a subclass of ModelBase.
:type model: models.ModelBase
:param session: The session to use.
:type session: sqlalchemy.orm.session.Session
:param args: Arguments to query. If None - model is used.
:type args: tuple
:param project_only: If present and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
:type project_only: bool
:param read_deleted: If present, overrides context's read_deleted field.
:type read_deleted: bool
Usage:
..code:: python
result = (utils.model_query(context, models.Instance, session=session)
.filter_by(uuid=instance_uuid)
.all())
query = utils.model_query(
context, Node,
session=session,
args=(func.count(Node.id), func.sum(Node.ram))
).filter_by(project_id=project_id)
"""
if not read_deleted:
if hasattr(context, 'read_deleted'):
# NOTE(viktors): some projects use `read_deleted` attribute in
# their contexts instead of `show_deleted`.
read_deleted = context.read_deleted
else:
read_deleted = context.show_deleted
if not issubclass(model, models.ModelBase):
raise TypeError(_("model should be a subclass of ModelBase"))
query = session.query(model) if not args else session.query(*args)
query = _read_deleted_filter(query, model, read_deleted)
query = _project_filter(query, model, context, project_only)
return query
def get_table(engine, name): def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db. """Returns an sqlalchemy table dynamically from db.
@ -207,6 +300,10 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance): **col_name_col_instance):
"""Drop unique constraint from table. """Drop unique constraint from table.
DEPRECATED: this function is deprecated and will be removed from neutron.db
in a few releases. Please use UniqueConstraint.drop() method directly for
sqlalchemy-migrate migration scripts.
This method drops UC from table and works for mysql, postgresql and sqlite. This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction. In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their Sqlalchemy doesn't support some sqlite column types and replaces their
@ -223,6 +320,8 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
types by sqlite. For example BigInteger. types by sqlite. For example BigInteger.
""" """
from migrate.changeset import UniqueConstraint
meta = MetaData() meta = MetaData()
meta.bind = migrate_engine meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True) t = Table(table_name, meta, autoload=True)
@ -262,8 +361,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
columns_for_select = [func.max(table.c.id)] columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by) columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select, duplicated_rows_select = sqlalchemy.sql.select(
group_by=columns_for_group_by, columns_for_select, group_by=columns_for_group_by,
having=func.count(table.c.id) > 1) having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select): for row in migrate_engine.execute(duplicated_rows_select):
@ -274,9 +373,10 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
for name in uc_column_names: for name in uc_column_names:
delete_condition &= table.c[name] == row[name] delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition) rows_to_delete_select = sqlalchemy.sql.select(
[table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall(): for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_("Deleting duplicated row with id: %(id)s from table: " LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name)) "%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete: if use_soft_delete:
@ -385,7 +485,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
else: else:
c_select.append(table.c.deleted == table.c.id) c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select)) ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
migrate_engine.execute(ins) migrate_engine.execute(ins)
table.drop() table.drop()

View File

@ -23,8 +23,8 @@ from oslo.config import cfg
from neutron.common import config from neutron.common import config
from neutron import context from neutron import context
from neutron.db import api as session
from neutron import manager from neutron import manager
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import excutils from neutron.openstack.common import excutils
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -118,7 +118,7 @@ class RpcWorker(object):
# We may have just forked from parent process. A quick disposal of the # We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are # existing sql connections avoids producing errors later when they are
# discovered to be broken. # discovered to be broken.
session.get_engine(sqlite_fk=True).pool.dispose() session.get_engine().pool.dispose()
self._server = self._plugin.start_rpc_listener() self._server = self._plugin.start_rpc_listener()
def wait(self): def wait(self):