Move to oslo_db
Replace the sqlalchemy engine and session management with an oslo.db facade. This does the work transparently and fixes connection mismanagement. The changes involve: - modifying the sqlalchemy session script to use oslo_db functions - adding the oslo_db engine configuration parameters - tweaking the faked tests to pass* * the faked tests are sensitive to sleep calls and oslo_db uses a sleep(0) hack which was causing issues. The coverage of the tests was not affected. Implements: blueprint move-to-oslo-db Change-Id: I59f1fc0fe1551b9a815a51e6a620fabf060b013f
This commit is contained in:
parent
4fcc22b06e
commit
27a4e2c650
@ -1,5 +1,5 @@
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
Content-Length: 442
|
||||
Content-Length: 929
|
||||
Date: Mon, 18 Mar 2013 19:09:17 GMT
|
||||
|
||||
|
@ -16,7 +16,23 @@
|
||||
"size": 0.14,
|
||||
"status": "COMPLETED",
|
||||
"updated": "2014-10-30T12:30:00"
|
||||
},
|
||||
{
|
||||
"created": "2014-10-30T12:30:00",
|
||||
"datastore": {
|
||||
"type": "mysql",
|
||||
"version": "5.5",
|
||||
"version_id": "b00000b0-00b0-0b00-00b0-000b000000bb"
|
||||
},
|
||||
"description": "My Incremental Backup",
|
||||
"id": "2e351a71-dd28-4bcb-a7d6-d36a5b487173",
|
||||
"instance_id": "44b277eb-39be-4921-be31-3d61b43651d7",
|
||||
"locationRef": "http://localhost/path/to/backup",
|
||||
"name": "Incremental Snapshot",
|
||||
"parent_id": "a9832168-7541-4536-b8d9-a8a9b79cf1b4",
|
||||
"size": 0.14,
|
||||
"status": "COMPLETED",
|
||||
"updated": "2014-10-30T12:30:00"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
Content-Length: 442
|
||||
Content-Length: 929
|
||||
Date: Mon, 18 Mar 2013 19:09:17 GMT
|
||||
|
||||
|
@ -16,7 +16,23 @@
|
||||
"size": 0.14,
|
||||
"status": "COMPLETED",
|
||||
"updated": "2014-10-30T12:30:00"
|
||||
},
|
||||
{
|
||||
"created": "2014-10-30T12:30:00",
|
||||
"datastore": {
|
||||
"type": "mysql",
|
||||
"version": "5.5",
|
||||
"version_id": "b00000b0-00b0-0b00-00b0-000b000000bb"
|
||||
},
|
||||
"description": "My Incremental Backup",
|
||||
"id": "2e351a71-dd28-4bcb-a7d6-d36a5b487173",
|
||||
"instance_id": "44b277eb-39be-4921-be31-3d61b43651d7",
|
||||
"locationRef": "http://localhost/path/to/backup",
|
||||
"name": "Incremental Snapshot",
|
||||
"parent_id": "a9832168-7541-4536-b8d9-a8a9b79cf1b4",
|
||||
"size": 0.14,
|
||||
"status": "COMPLETED",
|
||||
"updated": "2014-10-30T12:30:00"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -42,3 +42,4 @@ stevedore>=1.5.0 # Apache-2.0
|
||||
oslo.messaging!=2.8.0,>2.6.1 # Apache-2.0
|
||||
osprofiler>=0.3.0 # Apache-2.0
|
||||
oslo.log>=1.12.0 # Apache-2.0
|
||||
oslo.db>=4.1.0 # Apache-2.0
|
||||
|
@ -424,7 +424,46 @@ database_opts = [
|
||||
cfg.BoolOpt('query_log',
|
||||
default=False,
|
||||
deprecated_name='sql_query_log',
|
||||
deprecated_group='DEFAULT'),
|
||||
deprecated_group='DEFAULT',
|
||||
deprecated_for_removal=True),
|
||||
cfg.BoolOpt('sqlite_synchronous',
|
||||
default=True,
|
||||
help='If True, SQLite uses synchronous mode.'),
|
||||
cfg.StrOpt('slave_connection',
|
||||
secret=True,
|
||||
help='The SQLAlchemy connection string to use to connect to the'
|
||||
' slave database.'),
|
||||
cfg.StrOpt('mysql_sql_mode',
|
||||
default='TRADITIONAL',
|
||||
help='The SQL mode to be used for MySQL sessions. '
|
||||
'This option, including the default, overrides any '
|
||||
'server-set SQL mode. To use whatever SQL mode '
|
||||
'is set by the server configuration, '
|
||||
'set this to no value. Example: mysql_sql_mode='),
|
||||
cfg.IntOpt('max_pool_size',
|
||||
help='Maximum number of SQL connections to keep open in a '
|
||||
'pool.'),
|
||||
cfg.IntOpt('max_retries',
|
||||
default=10,
|
||||
help='Maximum number of database connection retries '
|
||||
'during startup. Set to -1 to specify an infinite '
|
||||
'retry count.'),
|
||||
cfg.IntOpt('retry_interval',
|
||||
default=10,
|
||||
help='Interval between retries of opening a SQL connection.'),
|
||||
cfg.IntOpt('max_overflow',
|
||||
help='If set, use this value for max_overflow with '
|
||||
'SQLAlchemy.'),
|
||||
cfg.IntOpt('connection_debug',
|
||||
default=0,
|
||||
help='Verbosity of SQL debugging information: 0=None, '
|
||||
'100=Everything.'),
|
||||
cfg.BoolOpt('connection_trace',
|
||||
default=False,
|
||||
help='Add Python stack traces to SQL as comment strings.'),
|
||||
cfg.IntOpt('pool_timeout',
|
||||
help='If set, use this value for pool_timeout with '
|
||||
'SQLAlchemy.'),
|
||||
]
|
||||
|
||||
|
||||
|
@ -14,20 +14,18 @@
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
from oslo_db.sqlalchemy import session
|
||||
from oslo_log import log as logging
|
||||
import osprofiler.sqlalchemy
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common.i18n import _
|
||||
from trove.db.sqlalchemy import mappers
|
||||
|
||||
_ENGINE = None
|
||||
_MAKER = None
|
||||
_FACADE = None
|
||||
_LOCK = threading.Lock()
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -36,11 +34,9 @@ CONF = cfg.CONF
|
||||
|
||||
|
||||
def configure_db(options, models_mapper=None):
|
||||
global _ENGINE
|
||||
if not _ENGINE:
|
||||
_ENGINE = _create_engine(options)
|
||||
facade = _create_facade(options)
|
||||
if models_mapper:
|
||||
models_mapper.map(_ENGINE)
|
||||
models_mapper.map(facade)
|
||||
else:
|
||||
from trove.backup import models as backup_models
|
||||
from trove.cluster import models as cluster_models
|
||||
@ -71,44 +67,69 @@ def configure_db(options, models_mapper=None):
|
||||
models = {}
|
||||
for module in model_modules:
|
||||
models.update(module.persisted_models())
|
||||
mappers.map(_ENGINE, models)
|
||||
mappers.map(get_engine(), models)
|
||||
|
||||
|
||||
def _create_engine(options):
|
||||
engine_args = {
|
||||
"pool_recycle": CONF.database.idle_timeout,
|
||||
"echo": CONF.database.query_log
|
||||
}
|
||||
LOG.info(_("Creating SQLAlchemy engine with args: %s") % engine_args)
|
||||
db_engine = create_engine(options['database']['connection'], **engine_args)
|
||||
if CONF.profiler.enabled and CONF.profiler.trace_sqlalchemy:
|
||||
osprofiler.sqlalchemy.add_tracing(sqlalchemy, db_engine, "db")
|
||||
return db_engine
|
||||
def _create_facade(options):
|
||||
global _LOCK, _FACADE
|
||||
# TODO(mvandijk): Refactor this once oslo.db spec is implemented:
|
||||
# https://specs.openstack.org/openstack/oslo-specs/specs/kilo/
|
||||
# make-enginefacade-a-facade.html
|
||||
if _FACADE is None:
|
||||
with _LOCK:
|
||||
if _FACADE is None:
|
||||
conf = CONF.database
|
||||
# pop the deprecated config option 'query_log'
|
||||
if conf.query_log:
|
||||
if conf.connection_debug < 50:
|
||||
conf['connection_debug'] = 50
|
||||
LOG.warning(_('Configuration option "query_log" has been '
|
||||
'depracated. Use "connection_debug" '
|
||||
'instead. Setting connection_debug = '
|
||||
'%(debug_level)s instead.')
|
||||
% conf.get('connection_debug'))
|
||||
# TODO(mvandijk): once query_log is removed,
|
||||
# use enginefacade.from_config() instead
|
||||
database_opts = dict(CONF.database)
|
||||
database_opts.pop('query_log')
|
||||
_FACADE = session.EngineFacade(
|
||||
options['database']['connection'],
|
||||
**database_opts
|
||||
)
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False):
|
||||
"""Helper method to grab session."""
|
||||
global _MAKER, _ENGINE
|
||||
if not _MAKER:
|
||||
if not _ENGINE:
|
||||
msg = "***The Database has not been setup!!!***"
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
_MAKER = sessionmaker(bind=_ENGINE,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
return _MAKER()
|
||||
def _check_facade():
|
||||
if _FACADE is None:
|
||||
msg = _("***The Database has not been setup!!!***")
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
def raw_query(model, autocommit=True, expire_on_commit=False):
|
||||
return get_session(autocommit, expire_on_commit).query(model)
|
||||
def get_facade():
|
||||
_check_facade()
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_engine(use_slave=False):
|
||||
_check_facade()
|
||||
return _FACADE.get_engine(use_slave=use_slave)
|
||||
|
||||
|
||||
def get_session(**kwargs):
|
||||
return get_facade().get_session(**kwargs)
|
||||
|
||||
|
||||
def raw_query(model, **kwargs):
|
||||
return get_session(**kwargs).query(model)
|
||||
|
||||
|
||||
def clean_db():
|
||||
global _ENGINE
|
||||
engine = get_engine()
|
||||
meta = MetaData()
|
||||
meta.reflect(bind=_ENGINE)
|
||||
with contextlib.closing(_ENGINE.connect()) as con:
|
||||
meta.bind = engine
|
||||
meta.reflect()
|
||||
with contextlib.closing(engine.connect()) as con:
|
||||
trans = con.begin()
|
||||
for table in reversed(meta.sorted_tables):
|
||||
if table.name != "migrate_version":
|
||||
@ -117,8 +138,10 @@ def clean_db():
|
||||
|
||||
|
||||
def drop_db(options):
|
||||
if options:
|
||||
_create_facade(options)
|
||||
engine = get_engine()
|
||||
meta = MetaData()
|
||||
engine = _create_engine(options)
|
||||
meta.bind = engine
|
||||
meta.reflect()
|
||||
meta.drop_all()
|
||||
|
@ -829,7 +829,7 @@ class Backups(ActiveMixin):
|
||||
assert_equal(len(results), 1)
|
||||
self.json_backup = results[JSON_INDEX]
|
||||
|
||||
@test
|
||||
@test(depends_on=[create_backup])
|
||||
def create_incremental_backup(self):
|
||||
set_fake_stuff(uuid=EXAMPLE_BACKUP_INCREMENTAL_ID)
|
||||
results = self.snippet(
|
||||
@ -844,9 +844,9 @@ class Backups(ActiveMixin):
|
||||
|
||||
self._wait_for_active("BACKUP")
|
||||
assert_equal(len(results), 1)
|
||||
self.json_backup = results[JSON_INDEX]
|
||||
self.json_backup2 = results[JSON_INDEX]
|
||||
|
||||
@test(depends_on=[create_backup])
|
||||
@test(depends_on=[create_incremental_backup])
|
||||
def get_backup(self):
|
||||
results = self.snippet(
|
||||
"backup_get",
|
||||
@ -855,7 +855,7 @@ class Backups(ActiveMixin):
|
||||
lambda client: client.backups.get(self.json_backup.id))
|
||||
assert_equal(len(results), 1)
|
||||
|
||||
@test(depends_on=[create_backup])
|
||||
@test(depends_on=[create_incremental_backup])
|
||||
def get_backups_for_instance(self):
|
||||
results = self.snippet(
|
||||
"backups_by_instance",
|
||||
@ -864,7 +864,7 @@ class Backups(ActiveMixin):
|
||||
lambda client: client.instances.backups(json_instance.id))
|
||||
assert_equal(len(results), 1)
|
||||
|
||||
@test(depends_on=[create_backup])
|
||||
@test(depends_on=[create_incremental_backup])
|
||||
def list_backups(self):
|
||||
results = self.snippet(
|
||||
"backup_list",
|
||||
|
@ -236,7 +236,7 @@ class FakeGuest(object):
|
||||
status.status = rd_instance.ServiceStatuses.RUNNING
|
||||
status.save()
|
||||
AgentHeartBeat.create(instance_id=self.id)
|
||||
eventlet.spawn_after(1.0, update_db)
|
||||
eventlet.spawn_after(3.0, update_db)
|
||||
|
||||
def _set_task_status(self, new_status='RUNNING'):
|
||||
from trove.instance.models import InstanceServiceStatus
|
||||
@ -317,7 +317,7 @@ class FakeGuest(object):
|
||||
backup.checksum = 'fake-md5-sum'
|
||||
backup.size = BACKUP_SIZE
|
||||
backup.save()
|
||||
eventlet.spawn_after(1.0, finish_create_backup)
|
||||
eventlet.spawn_after(7.5, finish_create_backup)
|
||||
|
||||
def mount_volume(self, device_path=None, mount_point=None):
|
||||
pass
|
||||
|
@ -208,14 +208,15 @@ def fake_sleep(time_to_sleep):
|
||||
Puts the coroutine which calls it to sleep. If a coroutine object is not
|
||||
associated with the caller this will fail.
|
||||
"""
|
||||
global sleep_allowance
|
||||
sleep_allowance -= 1
|
||||
if not other_threads_are_active():
|
||||
if sleep_allowance < -1:
|
||||
raise RuntimeError("Sleeping for no reason.")
|
||||
else:
|
||||
return # Forgive the thread for calling this for one time.
|
||||
sleep_allowance = allowable_empty_sleeps
|
||||
if time_to_sleep:
|
||||
global sleep_allowance
|
||||
sleep_allowance -= 1
|
||||
if not other_threads_are_active():
|
||||
if sleep_allowance < -1:
|
||||
raise RuntimeError("Sleeping for no reason.")
|
||||
else:
|
||||
return # Forgive the thread for calling this for one time.
|
||||
sleep_allowance = allowable_empty_sleeps
|
||||
|
||||
cr = Coroutine.get_current()
|
||||
for ft in fake_threads:
|
||||
|
@ -18,8 +18,8 @@
|
||||
|
||||
import re
|
||||
|
||||
from oslo_db.sqlalchemy import session
|
||||
import pexpect
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.exc import OperationalError
|
||||
try:
|
||||
from sqlalchemy.exc import ResourceClosedError
|
||||
@ -114,8 +114,10 @@ class SqlAlchemyConnection(object):
|
||||
|
||||
@staticmethod
|
||||
def _init_engine(user, password, host):
|
||||
return create_engine("mysql://%s:%s@%s:3306" % (user, password, host),
|
||||
pool_recycle=1800, echo=True)
|
||||
return session.EngineFacade(
|
||||
"mysql://%s:%s@%s:3306" % (user, password, host),
|
||||
pool_recycle=1800, echo=True
|
||||
).get_engine()
|
||||
|
||||
|
||||
class PexpectMySqlConnection(object):
|
||||
|
Loading…
x
Reference in New Issue
Block a user