Fix DB/Lock session handling issues
Prior to this fix, we have been unable to run the Metal3 CI job with SQLAlchemy's internal autocommit setting enabled. However that setting is deprecated and needs to be removed. Investigating our DB queries and request patterns, we were able to identify some queries which generally resulted in the underlying task and lock being held longer because the output was not actually returned, which is something we've generally had to fix in some places previously. Doing some of these changes did drastically reduce the number of errors encountered with the Metal3 CI job, however it did not eliminate them entirely. Further investigation, we were able to determine that the underlying issue we were encountering was when we had an external semi-random reader, such as Metal3 polling endpoints, we could reach a situation where we would be blocked from updating the database as to open a write lock, we need the active readers not to be interacting with the database, and with a random reader of sorts, the only realistic option we have is to enable the Write Ahead Log[0]. We didn't have to do this with SQLAlchemy previously because autocommit behavior hid the complexities from us, but in order to move to SQLAlchemy 2.0, we do need to remove autocommit. Additionally, adds two unit tests for get_node_with_token rpc method, which apparently we missed or lost somewhere along the way. Also, adds notes to two Database interactions to suggest we look at them in the future as they may not be the most efficient path forward. [0]: https://www.sqlite.org/wal.html Change-Id: Iebcc15fe202910b942b58fc004d077740ec61912
This commit is contained in:
parent
5e6fa6ef30
commit
75b881bd31
@ -2284,10 +2284,13 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
LOG.debug("RPC create_port called for port %s.", port_uuid)
|
||||
|
||||
with task_manager.acquire(context, port_obj.node_id,
|
||||
purpose='port create') as task:
|
||||
purpose='port create',
|
||||
shared=True) as task:
|
||||
# NOTE(TheJulia): We're creating a port, we don't need
|
||||
# an exclusive parent lock to do so.
|
||||
utils.validate_port_physnet(task, port_obj)
|
||||
port_obj.create()
|
||||
return port_obj
|
||||
return port_obj
|
||||
|
||||
@METRICS.timer('ConductorManager.update_port')
|
||||
@messaging.expected_exceptions(exception.NodeLocked,
|
||||
@ -2373,7 +2376,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
|
||||
port_obj.save()
|
||||
|
||||
return port_obj
|
||||
return port_obj
|
||||
|
||||
@METRICS.timer('ConductorManager.update_portgroup')
|
||||
@messaging.expected_exceptions(exception.NodeLocked,
|
||||
@ -2452,7 +2455,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
|
||||
portgroup_obj.save()
|
||||
|
||||
return portgroup_obj
|
||||
return portgroup_obj
|
||||
|
||||
@METRICS.timer('ConductorManager.update_volume_connector')
|
||||
@messaging.expected_exceptions(
|
||||
@ -2496,7 +2499,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
connector.save()
|
||||
LOG.info("Successfully updated volume connector %(connector)s.",
|
||||
{'connector': connector.uuid})
|
||||
return connector
|
||||
return connector
|
||||
|
||||
@METRICS.timer('ConductorManager.update_volume_target')
|
||||
@messaging.expected_exceptions(
|
||||
@ -2537,7 +2540,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
target.save()
|
||||
LOG.info("Successfully updated volume target %(target)s.",
|
||||
{'target': target.uuid})
|
||||
return target
|
||||
return target
|
||||
|
||||
@METRICS.timer('ConductorManager.get_driver_properties')
|
||||
@messaging.expected_exceptions(exception.DriverNotFound)
|
||||
@ -3564,7 +3567,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
{'node': task.node.uuid})
|
||||
utils.add_secret_token(task.node)
|
||||
task.node.save()
|
||||
return task.node
|
||||
return objects.Node.get(context, node_id)
|
||||
|
||||
@METRICS.timer('ConductorManager.manage_node_history')
|
||||
@periodics.periodic(
|
||||
|
@ -808,7 +808,6 @@ def power_state_error_handler(e, node, power_state):
|
||||
{'node': node.uuid, 'power_state': power_state})
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def validate_port_physnet(task, port_obj):
|
||||
"""Validate the consistency of physical networks of ports in a portgroup.
|
||||
|
||||
|
@ -78,7 +78,6 @@ def update_opt_defaults():
|
||||
# This comes in two flavors
|
||||
'oslo.messaging=INFO',
|
||||
'oslo_messaging=INFO',
|
||||
'sqlalchemy=WARNING',
|
||||
'stevedore=INFO',
|
||||
'eventlet.wsgi.server=INFO',
|
||||
'iso8601=WARNING',
|
||||
|
@ -10,9 +10,28 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy import event
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP
|
||||
# since it's not compatible with SQLAlchemy 2.0
|
||||
# NOTE(dtantsur): we want sqlite as close to a real database as possible.
|
||||
enginefacade.configure(sqlite_fk=True, __autocommit=True)
|
||||
|
||||
|
||||
# NOTE(TheJulia): Setup a listener to trigger the sqlite write-ahead
|
||||
# log to be utilized to permit concurrent access, which is needed
|
||||
# as we can get read requests while we are writing via the API
|
||||
# surface *when* we're using sqlite as the database backend.
|
||||
@event.listens_for(Engine, "connect")
|
||||
def _setup_journal_mode(dbapi_connection, connection_record):
|
||||
# NOTE(TheJulia): The string may not be loaded in some unit
|
||||
# tests so handle whatever the output is as a string so we
|
||||
# can lower/compare it and send the appropriate command to
|
||||
# the database.
|
||||
if 'sqlite' in str(CONF.database.connection).lower():
|
||||
dbapi_connection.execute("PRAGMA journal_mode=WAL")
|
||||
|
@ -324,7 +324,9 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
# object is garbage collected as ORM Query objects allow
|
||||
# for DB interactions to occur after the fact, so it remains
|
||||
# connected to the DB..
|
||||
return query.all()
|
||||
# Save the query.all() results, but don't return yet, so we
|
||||
# begin to exit and unwind the session.
|
||||
ref = query.all()
|
||||
else:
|
||||
# In this case, we have a sqlalchemy.sql.selectable.Select
|
||||
# (most likely) which utilizes the unified select interface.
|
||||
@ -338,7 +340,10 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
# Everything is a tuple in a resultset from the unified interface
|
||||
# but for objects, our model expects just object access,
|
||||
# so we extract and return them.
|
||||
return [r[0] for r in res]
|
||||
ref = [r[0] for r in res]
|
||||
# Return the results to the caller, outside of the session context
|
||||
# if an ORM object, because we want the session to close.
|
||||
return ref
|
||||
|
||||
|
||||
def _filter_active_conductors(query, interval=None):
|
||||
@ -1341,6 +1346,12 @@ class Connection(api.Connection):
|
||||
|
||||
def get_active_hardware_type_dict(self, use_groups=False):
|
||||
with _session_for_read() as session:
|
||||
# TODO(TheJulia): We should likely take a look at this
|
||||
# joined query, as we may not be getting what we expect.
|
||||
# Metal3 logs upwards of 200 rows returned with multiple datetime
|
||||
# columns.
|
||||
# Given dualing datetime fields, we really can't just expect
|
||||
# requesting a unique set to "just work".
|
||||
query = (session.query(models.ConductorHardwareInterfaces,
|
||||
models.Conductor)
|
||||
.join(models.Conductor))
|
||||
@ -1375,7 +1386,8 @@ class Connection(api.Connection):
|
||||
with _session_for_read() as session:
|
||||
query = (session.query(models.ConductorHardwareInterfaces)
|
||||
.filter_by(conductor_id=conductor_id))
|
||||
return query.all()
|
||||
ref = query.all()
|
||||
return ref
|
||||
|
||||
def list_hardware_type_interfaces(self, hardware_types):
|
||||
with _session_for_read() as session:
|
||||
@ -1397,6 +1409,8 @@ class Connection(api.Connection):
|
||||
conductor_hw_iface['conductor_id'] = conductor_id
|
||||
for k, v in iface.items():
|
||||
conductor_hw_iface[k] = v
|
||||
# TODO(TheJulia): Uhh... We should try to do this as one
|
||||
# bulk operation and not insert each row.
|
||||
session.add(conductor_hw_iface)
|
||||
session.flush()
|
||||
except db_exc.DBDuplicateEntry as e:
|
||||
@ -2080,9 +2094,10 @@ class Connection(api.Connection):
|
||||
query = session.query(models.Allocation).filter_by(
|
||||
id=allocation_id)
|
||||
try:
|
||||
return query.one()
|
||||
ref = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.AllocationNotFound(allocation=allocation_id)
|
||||
return ref
|
||||
|
||||
def get_allocation_by_uuid(self, allocation_uuid):
|
||||
"""Return an allocation representation.
|
||||
@ -2095,9 +2110,10 @@ class Connection(api.Connection):
|
||||
query = session.query(models.Allocation).filter_by(
|
||||
uuid=allocation_uuid)
|
||||
try:
|
||||
return query.one()
|
||||
ref = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.AllocationNotFound(allocation=allocation_uuid)
|
||||
return ref
|
||||
|
||||
def get_allocation_by_name(self, name):
|
||||
"""Return an allocation representation.
|
||||
@ -2109,9 +2125,10 @@ class Connection(api.Connection):
|
||||
with _session_for_read() as session:
|
||||
query = session.query(models.Allocation).filter_by(name=name)
|
||||
try:
|
||||
return query.one()
|
||||
ref = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.AllocationNotFound(allocation=name)
|
||||
return ref
|
||||
|
||||
def get_allocation_list(self, filters=None, limit=None, marker=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
@ -2455,7 +2472,7 @@ class Connection(api.Connection):
|
||||
session.flush()
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.NodeHistoryAlreadyExists(uuid=values['uuid'])
|
||||
return history
|
||||
return history
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def destroy_node_history_by_uuid(self, history_uuid):
|
||||
@ -2469,9 +2486,10 @@ class Connection(api.Connection):
|
||||
def get_node_history_by_id(self, history_id):
|
||||
query = model_query(models.NodeHistory).filter_by(id=history_id)
|
||||
try:
|
||||
return query.one()
|
||||
res = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.NodeHistoryNotFound(history=history_id)
|
||||
return res
|
||||
|
||||
def get_node_history_by_uuid(self, history_uuid):
|
||||
query = model_query(models.NodeHistory).filter_by(uuid=history_uuid)
|
||||
|
@ -3568,6 +3568,23 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
|
||||
filters=mock.sentinel.filters))
|
||||
self.assertEqual([], result)
|
||||
|
||||
def test_get_node_with_token(self):
|
||||
node = obj_utils.create_test_node(
|
||||
self.context, driver='fake-hardware',
|
||||
network_interface='noop')
|
||||
self.assertNotIn('agent_secret_token', node.driver_internal_info)
|
||||
res = self.service.get_node_with_token(self.context, node.id)
|
||||
self.assertIn('agent_secret_token', res.driver_internal_info)
|
||||
|
||||
def test_node_with_token_already_set(self):
|
||||
node = obj_utils.create_test_node(
|
||||
self.context, driver='fake-hardware',
|
||||
network_interface='noop',
|
||||
driver_internal_info={'agent_secret_token': 'secret'})
|
||||
res = self.service.get_node_with_token(self.context, node.id)
|
||||
self.assertEqual('******',
|
||||
res.driver_internal_info['agent_secret_token'])
|
||||
|
||||
|
||||
@mgr_utils.mock_record_keepalive
|
||||
class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
@ -3953,18 +3970,6 @@ class CreatePortTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
self.assertEqual({'foo': 'bar'}, res.extra)
|
||||
mock_validate.assert_called_once_with(mock.ANY, port)
|
||||
|
||||
def test_create_port_node_locked(self):
|
||||
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
|
||||
reservation='fake-reserv')
|
||||
port = obj_utils.get_test_port(self.context, node_id=node.id)
|
||||
exc = self.assertRaises(messaging.rpc.ExpectedException,
|
||||
self.service.create_port,
|
||||
self.context, port)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NodeLocked, exc.exc_info[0])
|
||||
self.assertRaises(exception.PortNotFound, port.get_by_uuid,
|
||||
self.context, port.uuid)
|
||||
|
||||
@mock.patch.object(conductor_utils, 'validate_port_physnet', autospec=True)
|
||||
def test_create_port_mac_exists(self, mock_validate):
|
||||
node = obj_utils.create_test_node(self.context, driver='fake-hardware')
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Fixes issues in Ironic's use of SQLAlchemy with SQLite Databases,
|
||||
which is common with users like Metal3, which prevented Ironic from
|
||||
supporting SQLAlchemy 2.0 properly, as autocommit was re-enabled.
|
Loading…
x
Reference in New Issue
Block a user