Merge "Sync latest oslo.db code into neutron"

This commit is contained in:
Jenkins 2014-02-23 22:21:40 +00:00 committed by Gerrit Code Review
commit 14b28f75c4
7 changed files with 803 additions and 250 deletions

View File

@ -1,16 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudscaling Group, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 Rackspace Hosting # Copyright (c) 2013 Rackspace Hosting
# All Rights Reserved. # All Rights Reserved.
# #
@ -21,27 +19,21 @@ Supported configuration options:
The following two parameters are in the 'database' group: The following two parameters are in the 'database' group:
`backend`: DB backend name or full module path to DB backend module. `backend`: DB backend name or full module path to DB backend module.
`use_tpool`: Enable thread pooling of DB API calls.
A DB backend module should implement a method named 'get_backend' which A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB takes no arguments. The method can return any object that implements DB
API methods. API methods.
*NOTE*: There are bugs in eventlet when using tpool combined with
threading locks. The python logging module happens to use such locks. To
work around this issue, be sure to specify thread=False with
eventlet.monkey_patch().
A bug for eventlet has been filed here:
https://bitbucket.org/eventlet/eventlet/issue/137/
""" """
import functools import functools
import logging
import time
from oslo.config import cfg from oslo.config import cfg
from neutron.openstack.common.db import exception
from neutron.openstack.common.gettextutils import _ # noqa
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import lockutils
db_opts = [ db_opts = [
@ -50,57 +42,95 @@ db_opts = [
deprecated_name='db_backend', deprecated_name='db_backend',
deprecated_group='DEFAULT', deprecated_group='DEFAULT',
help='The backend to use for db'), help='The backend to use for db'),
cfg.BoolOpt('use_tpool', cfg.BoolOpt('use_db_reconnect',
default=False, default=False,
deprecated_name='dbapi_use_tpool', help='Enable the experimental use of database reconnect '
deprecated_group='DEFAULT', 'on connection lost'),
help='Enable the experimental use of thread pooling for ' cfg.IntOpt('db_retry_interval',
'all DB API calls') default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(db_opts, 'database') CONF.register_opts(db_opts, 'database')
LOG = logging.getLogger(__name__)
def safe_for_db_retry(f):
"""Enable db-retry for decorated function, if config option enabled."""
f.__dict__['enable_retry'] = True
return f
def _wrap_db_retry(f):
"""Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect`
in config, this decorator will be applied to all db.api functions,
marked with @safe_for_db_retry decorator.
Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
next_interval = CONF.database.db_retry_interval
remaining = CONF.database.db_max_retries
while True:
try:
return f(*args, **kwargs)
except exception.DBConnectionError as e:
if remaining == 0:
LOG.exception(_('DB exceeded retry limit.'))
raise exception.DBError(e)
if remaining != -1:
remaining -= 1
LOG.exception(_('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so this
# effectively yields the execution context to
# another green thread.
time.sleep(next_interval)
if CONF.database.db_inc_retry_interval:
next_interval = min(
next_interval * 2,
CONF.database.db_max_retry_interval
)
return wrapper
class DBAPI(object): class DBAPI(object):
def __init__(self, backend_mapping=None): def __init__(self, backend_mapping=None):
if backend_mapping is None: if backend_mapping is None:
backend_mapping = {} backend_mapping = {}
self.__backend = None
self.__backend_mapping = backend_mapping
@lockutils.synchronized('dbapi_backend', 'neutron-')
def __get_backend(self):
"""Get the actual backend. May be a module or an instance of
a class. Doesn't matter to us. We do this synchronized as it's
possible multiple greenthreads started very quickly trying to do
DB calls and eventlet can switch threads before self.__backend gets
assigned.
"""
if self.__backend:
# Another thread assigned it
return self.__backend
backend_name = CONF.database.backend backend_name = CONF.database.backend
self.__use_tpool = CONF.database.use_tpool
if self.__use_tpool:
from eventlet import tpool
self.__tpool = tpool
# Import the untranslated name if we don't have a # Import the untranslated name if we don't have a
# mapping. # mapping.
backend_path = self.__backend_mapping.get(backend_name, backend_path = backend_mapping.get(backend_name, backend_name)
backend_name)
backend_mod = importutils.import_module(backend_path) backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend() self.__backend = backend_mod.get_backend()
return self.__backend
def __getattr__(self, key): def __getattr__(self, key):
backend = self.__backend or self.__get_backend() attr = getattr(self.__backend, key)
attr = getattr(backend, key)
if not self.__use_tpool or not hasattr(attr, '__call__'): if not hasattr(attr, '__call__'):
return attr return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry
# on disconnect.
if CONF.database.use_db_reconnect and hasattr(attr, 'enable_retry'):
attr = _wrap_db_retry(attr)
def tpool_wrapper(*args, **kwargs): return attr
return self.__tpool.execute(attr, *args, **kwargs)
functools.update_wrapper(tpool_wrapper, attr)
return tpool_wrapper

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -18,6 +16,8 @@
"""DB related custom exceptions.""" """DB related custom exceptions."""
import six
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _
@ -25,7 +25,7 @@ class DBError(Exception):
"""Wraps an implementation specific exception.""" """Wraps an implementation specific exception."""
def __init__(self, inner_exception=None): def __init__(self, inner_exception=None):
self.inner_exception = inner_exception self.inner_exception = inner_exception
super(DBError, self).__init__(str(inner_exception)) super(DBError, self).__init__(six.text_type(inner_exception))
class DBDuplicateEntry(DBError): class DBDuplicateEntry(DBError):
@ -43,3 +43,14 @@ class DBDeadlock(DBError):
class DBInvalidUnicodeParameter(Exception): class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: " message = _("Invalid Parameter: "
"Unicode is not supported by the current database.") "Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(message)
class DBConnectionError(DBError):
"""Wraps connection specific exception."""
pass

View File

@ -1,16 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudscaling Group, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. # Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
@ -22,11 +20,13 @@
SQLAlchemy models. SQLAlchemy models.
""" """
import six
from sqlalchemy import Column, Integer from sqlalchemy import Column, Integer
from sqlalchemy import DateTime from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper from sqlalchemy.orm import object_mapper
from neutron.openstack.common.db.sqlalchemy.session import get_session from neutron.openstack.common.db.sqlalchemy import session as sa
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
@ -37,15 +37,15 @@ class ModelBase(object):
def save(self, session=None): def save(self, session=None):
"""Save this object.""" """Save this object."""
if not session: if not session:
session = get_session() session = sa.get_session()
# NOTE(boris-42): This part of code should be look like: # NOTE(boris-42): This part of code should be look like:
# sesssion.add(self) # session.add(self)
# session.flush() # session.flush()
# But there is a bug in sqlalchemy and eventlet that # But there is a bug in sqlalchemy and eventlet that
# raises NoneType exception if there is no running # raises NoneType exception if there is no running
# transaction and rollback is called. As long as # transaction and rollback is called. As long as
# sqlalchemy has this bug we have to create transaction # sqlalchemy has this bug we have to create transaction
# explicity. # explicitly.
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
session.add(self) session.add(self)
session.flush() session.flush()
@ -59,23 +59,34 @@ class ModelBase(object):
def get(self, key, default=None): def get(self, key, default=None):
return getattr(self, key, default) return getattr(self, key, default)
@property
def _extra_keys(self):
"""Specifies custom fields
Subclasses can override this property to return a list
of custom fields that should be included in their dict
representation.
For reference check tests/db/sqlalchemy/test_models.py
"""
return []
def __iter__(self): def __iter__(self):
columns = dict(object_mapper(self).columns).keys() columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked # NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name' # up, beyond the actual db columns. An example would be the 'name'
# property for an Instance. # property for an Instance.
if hasattr(self, '_extra_keys'): columns.extend(self._extra_keys)
columns.extend(self._extra_keys())
self._i = iter(columns) self._i = iter(columns)
return self return self
def next(self): def next(self):
n = self._i.next() n = six.advance_iterator(self._i)
return n, getattr(self, n) return n, getattr(self, n)
def update(self, values): def update(self, values):
"""Make the model object behave like a dict.""" """Make the model object behave like a dict."""
for k, v in values.iteritems(): for k, v in six.iteritems(values):
setattr(self, k, v) setattr(self, k, v)
def iteritems(self): def iteritems(self):
@ -84,15 +95,15 @@ class ModelBase(object):
Includes attributes from joins. Includes attributes from joins.
""" """
local = dict(self) local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems() joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_']) if not k[0] == '_'])
local.update(joined) local.update(joined)
return local.iteritems() return six.iteritems(local)
class TimestampMixin(object): class TimestampMixin(object):
created_at = Column(DateTime, default=timeutils.utcnow) created_at = Column(DateTime, default=lambda: timeutils.utcnow())
updated_at = Column(DateTime, onupdate=timeutils.utcnow) updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
class SoftDeleteMixin(object): class SoftDeleteMixin(object):

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -20,41 +18,45 @@
Initializing: Initializing:
* Call set_defaults with the minimal of the following kwargs: * Call `set_defaults()` with the minimal of the following kwargs:
sql_connection, sqlite_db ``sql_connection``, ``sqlite_db``
Example: Example:
.. code:: python
session.set_defaults( session.set_defaults(
sql_connection="sqlite:///var/lib/neutron/sqlite.db", sql_connection="sqlite:///var/lib/neutron/sqlite.db",
sqlite_db="/var/lib/neutron/sqlite.db") sqlite_db="/var/lib/neutron/sqlite.db")
Recommended ways to use sessions within this framework: Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with AUTOCOMMIT=1. * Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
model_query() will implicitly use a session when called without one `model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted. to be automatically retried if the database connection is interrupted.
Note: Automatic retry will be enabled in a future patch. .. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at functionality should be handled at a logical level. For an example, look at
the code around quotas and reservation_rollback(). the code around quotas and `reservation_rollback()`.
Examples: Examples:
.. code:: python
def get_foo(context, foo): def get_foo(context, foo):
return model_query(context, models.Foo).\ return (model_query(context, models.Foo).
filter_by(foo=foo).\ filter_by(foo=foo).
first() first())
def update_foo(context, id, newfoo): def update_foo(context, id, newfoo):
model_query(context, models.Foo).\ (model_query(context, models.Foo).
filter_by(id=id).\ filter_by(id=id).
update({'foo': newfoo}) update({'foo': newfoo}))
def create_foo(context, values): def create_foo(context, values):
foo_ref = models.Foo() foo_ref = models.Foo()
@ -63,18 +65,26 @@ Recommended ways to use sessions within this framework:
return foo_ref return foo_ref
* Within the scope of a single method, keeping all the reads and writes within * Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's __exit__ the context managed by a single session. In this way, the session's
handler will take care of calling flush() and commit() for you. `__exit__` handler will take care of calling `flush()` and `commit()` for
If using this approach, you should not explicitly call flush() or commit(). you. If using this approach, you should not explicitly call `flush()` or
Any error within the context of the session will cause the session to emit `commit()`. Any error within the context of the session will cause the
a ROLLBACK. If the connection is dropped before this is possible, the session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
database will implicitly rollback the transaction. raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
Note: statements in the session scope will not be automatically retried. .. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you If you create models within the session, they need to be added, but you
do not need to call model.save() do not need to call `model.save()`:
.. code:: python
def create_many_foo(context, foos): def create_many_foo(context, foos):
session = get_session() session = get_session()
@ -87,36 +97,62 @@ Recommended ways to use sessions within this framework:
def update_bar(context, foo_id, newbar): def update_bar(context, foo_id, newbar):
session = get_session() session = get_session()
with session.begin(): with session.begin():
foo_ref = model_query(context, models.Foo, session).\ foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).\ filter_by(id=foo_id).
first() first())
model_query(context, models.Bar, session).\ (model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).\ filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}) update({'bar': newbar}))
Note: update_bar is a trivially simple example of using "with session.begin". .. note:: `update_bar` is a trivially simple example of using
Whereas create_many_foo is a good example of when a transaction is needed, ``with session.begin``. Whereas `create_many_foo` is a good example of
it is always best to use as few queries as possible. The two queries in when a transaction is needed, it is always best to use as few queries as
update_bar can be better expressed using a single query which avoids possible.
the need for an explicit transaction. It can be expressed like so:
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
.. code:: python
def update_bar(context, foo_id, newbar): def update_bar(context, foo_id, newbar):
subq = model_query(context, models.Foo.id).\ subq = (model_query(context, models.Foo.id).
filter_by(id=foo_id).\ filter_by(id=foo_id).
limit(1).\ limit(1).
subquery() subquery())
model_query(context, models.Bar).\ (model_query(context, models.Bar).
filter_by(id=subq.as_scalar()).\ filter_by(id=subq.as_scalar()).
update({'bar': newbar}) update({'bar': newbar}))
For reference, this emits approximagely the following SQL statement: For reference, this emits approximately the following SQL statement:
.. code:: sql
UPDATE bar SET bar = ${newbar} UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code:: python
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = get_session()
try:
with session.begin():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed * Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call session.begin() on an existing SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope. not be involved in sessions within the caller's scope.
@ -129,6 +165,8 @@ Recommended ways to use sessions within this framework:
becomes less clear in this situation. When this is needed for code clarity, becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented. it should be clearly documented.
.. code:: python
def myfunc(foo): def myfunc(foo):
session = get_session() session = get_session()
with session.begin(): with session.begin():
@ -148,13 +186,13 @@ There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary. * Don't keep a transaction open any longer than necessary.
This means that your "with session.begin()" block should be as short This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that as possible, while still containing all the related calls for that
transaction. transaction.
* Avoid "with_lockmode('UPDATE')" when possible. * Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space. "gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap This can effectively prevent any INSERT into a table by locking the gap
@ -165,16 +203,19 @@ There are some things which it is best to avoid:
number of rows matching a query, and if only one row is returned, number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE. then issue the SELECT FOR UPDATE.
The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE. The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables. proper UNIQUE constraints are added to the tables.
Enabling soft deletes: Enabling soft deletes:
* To use/enable soft-deletes, the SoftDeleteMixin must be added * To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example: to your model class. For example:
.. code:: python
class NovaBase(models.SoftDeleteMixin, models.ModelBase): class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass pass
@ -182,13 +223,15 @@ Enabling soft deletes:
Efficient use of soft deletes: Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted: * There are two possible ways to mark a record as deleted:
model.soft_delete() and query.soft_delete(). `model.soft_delete()` and `query.soft_delete()`.
model.soft_delete() method works with single already fetched entry. The `model.soft_delete()` method works with a single already-fetched entry.
query.soft_delete() makes only one db request for all entries that correspond `query.soft_delete()` makes only one db request for all entries that
to query. correspond to the query.
* In almost all cases you should use query.soft_delete(). Some examples: * In almost all cases you should use `query.soft_delete()`. Some examples:
.. code:: python
def soft_delete_bar(): def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete() count = model_query(BarModel).find(some_condition).soft_delete()
@ -199,18 +242,20 @@ Efficient use of soft deletes:
if session is None: if session is None:
session = get_session() session = get_session()
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
count = model_query(BarModel).\ count = (model_query(BarModel).
find(some_condition).\ find(some_condition).
soft_delete(synchronize_session=True) soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we # Here synchronize_session is required, because we
# don't know what is going on in outer session. # don't know what is going on in outer session.
if count == 0: if count == 0:
raise Exception("0 entries were soft deleted") raise Exception("0 entries were soft deleted")
* There is only one situation where model.soft_delete() is appropriate: when * There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same you fetch a single record, work with it, and mark it as deleted in the same
transaction. transaction.
.. code:: python
def soft_delete_bar_model(): def soft_delete_bar_model():
session = get_session() session = get_session()
with session.begin(): with session.begin():
@ -219,13 +264,15 @@ Efficient use of soft deletes:
bar_ref.soft_delete(session=session) bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and However, if you need to work with all entries that correspond to query and
then soft delete them you should use query.soft_delete() method: then soft delete them you should use the `query.soft_delete()` method:
.. code:: python
def soft_delete_multi_models(): def soft_delete_multi_models():
session = get_session() session = get_session()
with session.begin(): with session.begin():
query = model_query(BarModel, session=session).\ query = (model_query(BarModel, session=session).
find(some_condition) find(some_condition))
model_refs = query.all() model_refs = query.all()
# Work with model_refs # Work with model_refs
query.soft_delete(synchronize_session=False) query.soft_delete(synchronize_session=False)
@ -233,23 +280,26 @@ Efficient use of soft deletes:
# session and these entries are not used after this. # session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete, When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using model.soft_delete(), as in the following which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient. example, is very inefficient.
.. code:: python
for bar_ref in bar_refs: for bar_ref in bar_refs:
bar_ref.soft_delete(session=session) bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests. # This will produce count(bar_refs) db requests.
""" """
import functools
import logging
import os.path import os.path
import re import re
import time import time
from eventlet import greenthread
from oslo.config import cfg from oslo.config import cfg
import six import six
from sqlalchemy import exc as sqla_exc from sqlalchemy import exc as sqla_exc
import sqlalchemy.interfaces
from sqlalchemy.interfaces import PoolListener from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.pool import NullPool, StaticPool
@ -257,18 +307,15 @@ from sqlalchemy.sql.expression import literal_column
from neutron.openstack.common.db import exception from neutron.openstack.common.db import exception
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
DEFAULT = 'DEFAULT'
sqlite_db_opts = [ sqlite_db_opts = [
cfg.StrOpt('sqlite_db', cfg.StrOpt('sqlite_db',
default='neutron.sqlite', default='neutron.sqlite',
help='the filename to use with sqlite'), help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous', cfg.BoolOpt('sqlite_synchronous',
default=True, default=True,
help='If true, use synchronous mode for sqlite'), help='If True, SQLite uses synchronous mode'),
] ]
database_opts = [ database_opts = [
@ -278,76 +325,80 @@ database_opts = [
'../', '$sqlite_db')), '../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'database', 'database',
deprecated_name='sql_connection', secret=True,
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection', deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DATABASE')], group='DEFAULT'),
secret=True), cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection', cfg.StrOpt('slave_connection',
default='', default='',
secret=True,
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '
'slave database', 'slave database'),
secret=True),
cfg.IntOpt('idle_timeout', cfg.IntOpt('idle_timeout',
default=3600, default=3600,
deprecated_name='sql_idle_timeout',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE')], group='DEFAULT'),
help='timeout before idle sql connections are reaped'), cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size', cfg.IntOpt('min_pool_size',
default=1, default=1,
deprecated_name='sql_min_pool_size',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')], group='DATABASE')],
help='Minimum number of SQL connections to keep open in a ' help='Minimum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('max_pool_size', cfg.IntOpt('max_pool_size',
default=None, default=None,
deprecated_name='sql_max_pool_size',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')], group='DATABASE')],
help='Maximum number of SQL connections to keep open in a ' help='Maximum number of SQL connections to keep open in a '
'pool'), 'pool'),
cfg.IntOpt('max_retries', cfg.IntOpt('max_retries',
default=10, default=10,
deprecated_name='sql_max_retries',
deprecated_group=DEFAULT,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')], group='DATABASE')],
help='maximum db connection retries during startup. ' help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'), '(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval', cfg.IntOpt('retry_interval',
default=10, default=10,
deprecated_name='sql_retry_interval', deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
deprecated_group=DEFAULT, group='DEFAULT'),
deprecated_opts=[cfg.DeprecatedOpt('reconnect_interval', cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')], group='DATABASE')],
help='interval between retries of opening a sql connection'), help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow', cfg.IntOpt('max_overflow',
default=None, default=None,
deprecated_name='sql_max_overflow', deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
deprecated_group=DEFAULT, group='DEFAULT'),
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_max_overflow', cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')], group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'), help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug', cfg.IntOpt('connection_debug',
default=0, default=0,
deprecated_name='sql_connection_debug', deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
deprecated_group=DEFAULT, group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, ' help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'), '100=Everything'),
cfg.BoolOpt('connection_trace', cfg.BoolOpt('connection_trace',
default=False, default=False,
deprecated_name='sql_connection_trace', deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
deprecated_group=DEFAULT, group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'), help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout', cfg.IntOpt('pool_timeout',
default=None, default=None,
deprecated_name='sqlalchemy_pool_timeout', deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
deprecated_group='DATABASE', group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'), help='If set, use this value for pool_timeout with sqlalchemy'),
] ]
@ -411,8 +462,8 @@ class SqliteForeignKeysListener(PoolListener):
dbapi_con.execute('pragma foreign_keys=ON') dbapi_con.execute('pragma foreign_keys=ON')
def get_session(autocommit=True, expire_on_commit=False, def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
sqlite_fk=False, slave_session=False): slave_session=False, mysql_traditional_mode=False):
"""Return a SQLAlchemy session.""" """Return a SQLAlchemy session."""
global _MAKER global _MAKER
global _SLAVE_MAKER global _SLAVE_MAKER
@ -422,7 +473,8 @@ def get_session(autocommit=True, expire_on_commit=False,
maker = _SLAVE_MAKER maker = _SLAVE_MAKER
if maker is None: if maker is None:
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session) engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
mysql_traditional_mode=mysql_traditional_mode)
maker = get_maker(engine, autocommit, expire_on_commit) maker = get_maker(engine, autocommit, expire_on_commit)
if slave_session: if slave_session:
@ -441,6 +493,11 @@ def get_session(autocommit=True, expire_on_commit=False,
# 1 column - (IntegrityError) column c1 is not unique # 1 column - (IntegrityError) column c1 is not unique
# N columns - (IntegrityError) column c1, c2, ..., N are not unique # N columns - (IntegrityError) column c1, c2, ..., N are not unique
# #
# sqlite since 3.7.16:
# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
#
# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
#
# postgres: # postgres:
# 1 column - (IntegrityError) duplicate key value violates unique # 1 column - (IntegrityError) duplicate key value violates unique
# constraint "users_c1_key" # constraint "users_c1_key"
@ -453,9 +510,10 @@ def get_session(autocommit=True, expire_on_commit=False,
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'") # with -' for key 'name_of_our_constraint'")
_DUP_KEY_RE_DB = { _DUP_KEY_RE_DB = {
"sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
"postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"), re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$") "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
} }
@ -480,13 +538,22 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
if engine_name not in ["mysql", "sqlite", "postgresql"]: if engine_name not in ["mysql", "sqlite", "postgresql"]:
return return
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message) # FIXME(johannes): The usage of the .message attribute has been
if not m: # deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
for pattern in _DUP_KEY_RE_DB[engine_name]:
match = pattern.match(integrity_error.message)
if match:
break
else:
return return
columns = m.group(1)
columns = match.group(1)
if engine_name == "sqlite": if engine_name == "sqlite":
columns = columns.strip().split(", ") columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
else: else:
columns = get_columns_from_uniq_cons_or_name(columns) columns = get_columns_from_uniq_cons_or_name(columns)
raise exception.DBDuplicateEntry(columns, integrity_error) raise exception.DBDuplicateEntry(columns, integrity_error)
@ -512,6 +579,11 @@ def _raise_if_deadlock_error(operational_error, engine_name):
re = _DEADLOCK_RE_DB.get(engine_name) re = _DEADLOCK_RE_DB.get(engine_name)
if re is None: if re is None:
return return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message) m = re.match(operational_error.message)
if not m: if not m:
return return
@ -519,19 +591,21 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f): def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(*args, **kwargs): def _wrap(*args, **kwargs):
try: try:
return f(*args, **kwargs) return f(*args, **kwargs)
except UnicodeEncodeError: except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter() raise exception.DBInvalidUnicodeParameter()
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.OperationalError as e: except sqla_exc.OperationalError as e:
_raise_if_db_connection_lost(e, get_engine())
_raise_if_deadlock_error(e, get_engine().name) _raise_if_deadlock_error(e, get_engine().name)
# NOTE(comstud): A lot of code is checking for OperationalError # NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now. # so let's not wrap it for now.
raise raise
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.IntegrityError as e: except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different # note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example # DBs so we must do this. Also in some tables (for example
@ -543,11 +617,11 @@ def _wrap_db_error(f):
except Exception as e: except Exception as e:
LOG.exception(_('DB exception wrapped.')) LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e) raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap return _wrap
def get_engine(sqlite_fk=False, slave_engine=False): def get_engine(sqlite_fk=False, slave_engine=False,
mysql_traditional_mode=False):
"""Return a SQLAlchemy engine.""" """Return a SQLAlchemy engine."""
global _ENGINE global _ENGINE
global _SLAVE_ENGINE global _SLAVE_ENGINE
@ -559,8 +633,8 @@ def get_engine(sqlite_fk=False, slave_engine=False):
db_uri = CONF.database.slave_connection db_uri = CONF.database.slave_connection
if engine is None: if engine is None:
engine = create_engine(db_uri, engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
sqlite_fk=sqlite_fk) mysql_traditional_mode=mysql_traditional_mode)
if slave_engine: if slave_engine:
_SLAVE_ENGINE = engine _SLAVE_ENGINE = engine
else: else:
@ -583,44 +657,77 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp) dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record): def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed. """Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch. implemented by C libraries that eventlet cannot monkey patch.
""" """
greenthread.sleep(0) time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy): def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the pool are alive. """Ensures that MySQL and DB2 connections are alive.
Borrowed from: Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
""" """
cursor = dbapi_conn.cursor()
try: try:
dbapi_conn.cursor().execute('select 1') ping_sql = 'select 1'
except dbapi_conn.OperationalError as ex: if engine.name == 'ibm_db_sa':
if ex.args[0] in (2006, 2013, 2014, 2045, 2055): # DB2 requires a table expression
LOG.warn(_('Got mysql server has gone away: %s'), ex) ping_sql = 'select 1 from (values (1)) AS t1'
raise sqla_exc.DisconnectionError("Database server went away") cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex
LOG.warning(msg)
raise sqla_exc.DisconnectionError(msg)
else: else:
raise raise
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
"""Set engine mode to 'traditional'.
Required to prevent silent truncates at insert or update operations
under MySQL. By default MySQL truncates inserted string if it longer
than a declared field just with warning. That is fraught with data
corruption.
"""
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
def _is_db_connection_error(args): def _is_db_connection_error(args):
"""Return True if error in connecting to db.""" """Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended # NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others. # to support Postgres and others.
conn_err_codes = ('2002', '2003', '2006') # For the db2, the error code is -30081 since the db2 is still not ready
conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
for err_code in conn_err_codes: for err_code in conn_err_codes:
if args.find(err_code) != -1: if args.find(err_code) != -1:
return True return True
return False return False
def create_engine(sql_connection, sqlite_fk=False): def _raise_if_db_connection_lost(error, engine):
# NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
# requires connection and cursor in incoming parameters,
# but we have no possibility to create connection if DB
# is not available, so in such case reconnect fails.
# But is_disconnect() ignores these parameters, so it
# makes sense to pass to function None as placeholder
# instead of connection and cursor.
if engine.dialect.is_disconnect(error, None, None):
raise exception.DBConnectionError(error)
def create_engine(sql_connection, sqlite_fk=False,
mysql_traditional_mode=False):
"""Return a new SQLAlchemy engine.""" """Return a new SQLAlchemy engine."""
# NOTE(geekinutah): At this point we could be connecting to the normal # NOTE(geekinutah): At this point we could be connecting to the normal
# db handle or the slave db handle. Things like # db handle or the slave db handle. Things like
@ -659,10 +766,21 @@ def create_engine(sql_connection, sqlite_fk=False):
engine = sqlalchemy.create_engine(sql_connection, **engine_args) engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield) sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername: if engine.name in ['mysql', 'ibm_db_sa']:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener) callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
if engine.name == 'mysql':
if mysql_traditional_mode:
sqlalchemy.event.listen(engine, 'checkout',
_set_mode_traditional)
else:
LOG.warning(_("This application has not enabled MySQL "
"traditional mode, which means silent "
"data corruption may occur. "
"Please encourage the application "
"developers to enable this mode."))
elif 'sqlite' in connection_dict.drivername: elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous: if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect', sqlalchemy.event.listen(engine, 'connect',
@ -684,7 +802,7 @@ def create_engine(sql_connection, sqlite_fk=False):
remaining = 'infinite' remaining = 'infinite'
while True: while True:
msg = _('SQL connection failed. %s attempts left.') msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining) LOG.warning(msg % remaining)
if remaining != 'infinite': if remaining != 'infinite':
remaining -= 1 remaining -= 1
time.sleep(CONF.database.retry_interval) time.sleep(CONF.database.retry_interval)
@ -743,25 +861,25 @@ def _patch_mysqldb_with_stacktrace_comments():
def _do_query(self, q): def _do_query(self, q):
stack = '' stack = ''
for file, line, method, function in traceback.extract_stack(): for filename, line, method, function in traceback.extract_stack():
# exclude various common things from trace # exclude various common things from trace
if file.endswith('session.py') and method == '_do_query': if filename.endswith('session.py') and method == '_do_query':
continue continue
if file.endswith('api.py') and method == 'wrapper': if filename.endswith('api.py') and method == 'wrapper':
continue continue
if file.endswith('utils.py') and method == '_inner': if filename.endswith('utils.py') and method == '_inner':
continue continue
if file.endswith('exception.py') and method == '_wrap': if filename.endswith('exception.py') and method == '_wrap':
continue continue
# db/api is just a wrapper around db/sqlalchemy/api # db/api is just a wrapper around db/sqlalchemy/api
if file.endswith('db/api.py'): if filename.endswith('db/api.py'):
continue continue
# only trace inside neutron # only trace inside neutron
index = file.rfind('neutron') index = filename.rfind('neutron')
if index == -1: if index == -1:
continue continue
stack += "File:%s:%s Method:%s() Line:%s | " \ stack += "File:%s:%s Method:%s() Line:%s | " \
% (file[index:], line, method, function) % (filename[index:], line, method, function)
# strip trailing " | " from stack # strip trailing " | " from stack
if stack: if stack:

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# Copyright 2010-2011 OpenStack Foundation. # Copyright 2010-2011 OpenStack Foundation.
@ -18,16 +16,42 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Implementation of paginate query.""" import logging
import re
from migrate.changeset import UniqueConstraint
import sqlalchemy import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import log as logging from neutron.openstack.common import timeutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception): class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.") message = _("Sort key supplied was not valid.")
@ -69,7 +93,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
if 'id' not in sort_keys: if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check # TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id # the actual primary key, rather than assuming its id
LOG.warn(_('Id not in sort_keys; is sort_keys unique?')) LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs)) assert(not (sort_dir and sort_dirs))
@ -85,11 +109,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Add sorting # Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
sort_dir_func = { try:
'asc': sqlalchemy.asc, sort_dir_func = {
'desc': sqlalchemy.desc, 'asc': sqlalchemy.asc,
}[current_sort_dir] 'desc': sqlalchemy.desc,
}[current_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
try: try:
sort_key_attr = getattr(model, current_sort_key) sort_key_attr = getattr(model, current_sort_key)
except AttributeError: except AttributeError:
@ -105,20 +132,17 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Build up an array of sort criteria as in the docstring # Build up an array of sort criteria as in the docstring
criteria_list = [] criteria_list = []
for i in range(0, len(sort_keys)): for i in range(len(sort_keys)):
crit_attrs = [] crit_attrs = []
for j in range(0, i): for j in range(i):
model_attr = getattr(model, sort_keys[j]) model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j])) crit_attrs.append((model_attr == marker_values[j]))
model_attr = getattr(model, sort_keys[i]) model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc': if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i])) crit_attrs.append((model_attr < marker_values[i]))
elif sort_dirs[i] == 'asc':
crit_attrs.append((model_attr > marker_values[i]))
else: else:
raise ValueError(_("Unknown sort direction, " crit_attrs.append((model_attr > marker_values[i]))
"must be 'desc' or 'asc'"))
criteria = sqlalchemy.sql.and_(*crit_attrs) criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria) criteria_list.append(criteria)
@ -130,3 +154,394 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
query = query.limit(limit) query = query.limit(limit)
return query return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
metadata = MetaData()
metadata.bind = engine
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
except KeyError:
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise ColumnError(msg % column_name)
return column
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
def _get_default_deleted_value(table):
if isinstance(table.c.id.type, Integer):
return 0
if isinstance(table.c.id.type, String):
return ""
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
table = get_table(migrate_engine, table_name)
insp = reflection.Inspector.from_engine(migrate_engine)
real_indexes = insp.get_indexes(table_name)
existing_index_names = dict(
[(index['name'], index['column_names']) for index in real_indexes])
# NOTE(boris-42): Restore indexes on `deleted` column
for index in indexes:
if 'deleted' not in index['column_names']:
continue
name = index['name']
if name in existing_index_names:
column_names = [table.c[c] for c in existing_index_names[name]]
old_index = Index(name, *column_names, unique=index["unique"])
old_index.drop(migrate_engine)
column_names = [table.c[c] for c in index['column_names']]
new_index = Index(index["name"], *column_names, unique=index["unique"])
new_index.create(migrate_engine)
def change_deleted_column_type_to_boolean(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_boolean_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
old_deleted = Column('old_deleted', Boolean, default=False)
old_deleted.create(table, populate_default=False)
table.update().\
where(table.c.deleted == table.c.id).\
values(old_deleted=True).\
execute()
table.c.deleted.drop()
table.c.old_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
**col_name_col_instance):
insp = reflection.Inspector.from_engine(migrate_engine)
table = get_table(migrate_engine, table_name)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', Boolean, default=0)
columns.append(column_copy)
constraints = [constraint.copy() for constraint in table.constraints]
meta = table.metadata
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
c_select = []
for c in table.c:
if c.name != "deleted":
c_select.append(c)
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
new_table.update().\
where(new_table.c.deleted == new_table.c.id).\
values(deleted=True).\
execute()
def change_deleted_column_type_to_id_type(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_id_type_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
new_deleted = Column('new_deleted', table.c.id.type,
default=_get_default_deleted_value(table))
new_deleted.create(table, populate_default=True)
deleted = True # workaround for pyflakes
table.update().\
where(table.c.deleted == deleted).\
values(new_deleted=table.c.id).\
execute()
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
**col_name_col_instance):
# NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
# constraints in sqlite DB and our `deleted` column has
# 2 check constraints. So there is only one way to remove
# these constraints:
# 1) Create new table with the same columns, constraints
# and indexes. (except deleted column).
# 2) Copy all data from old to new table.
# 3) Drop old table.
# 4) Rename new table to old table name.
insp = reflection.Inspector.from_engine(migrate_engine)
meta = MetaData(bind=migrate_engine)
table = Table(table_name, meta, autoload=True)
default_deleted_value = _get_default_deleted_value(table)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
def is_deleted_column_constraint(constraint):
# NOTE(boris-42): There is no other way to check is CheckConstraint
# associated with deleted column.
if not isinstance(constraint, CheckConstraint):
return False
sqltext = str(constraint.sqltext)
return (sqltext.endswith("deleted in (0, 1)") or
sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
constraints = []
for constraint in table.constraints:
if not is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
ins = InsertFromSelect(new_table, table.select())
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=new_table.c.id).\
execute()
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\
execute()
def get_connect_string(backend, database, user=None, passwd=None):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
"""
args = {'backend': backend,
'user': user,
'passwd': passwd,
'database': database}
if backend == 'sqlite':
template = '%(backend)s:///%(database)s'
else:
template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
return template % args
def is_backend_avail(backend, database, user=None, passwd=None):
try:
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)