Merge "Introduce conductor touch while offline"
This commit is contained in:
commit
665f061755
@ -321,13 +321,16 @@ class BaseConductorManager(object):
|
|||||||
# This is only used in tests currently. Delete it?
|
# This is only used in tests currently. Delete it?
|
||||||
self._periodic_task_callables = periodic_task_callables
|
self._periodic_task_callables = periodic_task_callables
|
||||||
|
|
||||||
|
def keepalive_halt(self):
|
||||||
|
self._keepalive_evt.set()
|
||||||
|
|
||||||
def del_host(self, deregister=True, clear_node_reservations=True):
|
def del_host(self, deregister=True, clear_node_reservations=True):
|
||||||
# Conductor deregistration fails if called on non-initialized
|
# Conductor deregistration fails if called on non-initialized
|
||||||
# conductor (e.g. when rpc server is unreachable).
|
# conductor (e.g. when rpc server is unreachable).
|
||||||
if not hasattr(self, 'conductor'):
|
if not hasattr(self, 'conductor'):
|
||||||
return
|
return
|
||||||
self._shutdown = True
|
self._shutdown = True
|
||||||
self._keepalive_evt.set()
|
self.keepalive_halt()
|
||||||
|
|
||||||
if clear_node_reservations:
|
if clear_node_reservations:
|
||||||
# clear all locks held by this conductor before deregistering
|
# clear all locks held by this conductor before deregistering
|
||||||
@ -469,7 +472,7 @@ class BaseConductorManager(object):
|
|||||||
return
|
return
|
||||||
while not self._keepalive_evt.is_set():
|
while not self._keepalive_evt.is_set():
|
||||||
try:
|
try:
|
||||||
self.conductor.touch()
|
self.conductor.touch(online=not self._shutdown)
|
||||||
except db_exception.DBConnectionError:
|
except db_exception.DBConnectionError:
|
||||||
LOG.warning('Conductor could not connect to database '
|
LOG.warning('Conductor could not connect to database '
|
||||||
'while heartbeating.')
|
'while heartbeating.')
|
||||||
|
@ -585,10 +585,16 @@ class Connection(object, metaclass=abc.ABCMeta):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def touch_conductor(self, hostname):
|
def touch_conductor(self, hostname, online=True):
|
||||||
"""Mark a conductor as active by updating its 'updated_at' property.
|
"""Mark a conductor as active by updating its 'updated_at' property.
|
||||||
|
|
||||||
|
Calling periodically with ``online=False`` will result in the conductor
|
||||||
|
appearing unregistered, but recently enough to prevent other conductors
|
||||||
|
failing orphan nodes. This improves the behaviour of graceful and drain
|
||||||
|
shutdown.
|
||||||
|
|
||||||
:param hostname: The hostname of this conductor service.
|
:param hostname: The hostname of this conductor service.
|
||||||
|
:param online: Whether the conductor is online.
|
||||||
:raises: ConductorNotFound
|
:raises: ConductorNotFound
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -1392,13 +1392,13 @@ class Connection(api.Connection):
|
|||||||
raise exception.ConductorNotFound(conductor=hostname)
|
raise exception.ConductorNotFound(conductor=hostname)
|
||||||
|
|
||||||
@oslo_db_api.retry_on_deadlock
|
@oslo_db_api.retry_on_deadlock
|
||||||
def touch_conductor(self, hostname):
|
def touch_conductor(self, hostname, online=True):
|
||||||
with _session_for_write() as session:
|
with _session_for_write() as session:
|
||||||
query = sa.update(models.Conductor).where(
|
query = sa.update(models.Conductor).where(
|
||||||
models.Conductor.hostname == hostname
|
models.Conductor.hostname == hostname
|
||||||
).values({
|
).values({
|
||||||
'updated_at': timeutils.utcnow(),
|
'updated_at': timeutils.utcnow(),
|
||||||
'online': True}
|
'online': online}
|
||||||
).execution_options(synchronize_session=False)
|
).execution_options(synchronize_session=False)
|
||||||
res = session.execute(query)
|
res = session.execute(query)
|
||||||
count = res.rowcount
|
count = res.rowcount
|
||||||
|
@ -111,9 +111,9 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
|
|||||||
# methods can be used in the future to replace current explicit RPC calls.
|
# methods can be used in the future to replace current explicit RPC calls.
|
||||||
# Implications of calling new remote procedures should be thought through.
|
# Implications of calling new remote procedures should be thought through.
|
||||||
# @object_base.remotable
|
# @object_base.remotable
|
||||||
def touch(self, context=None):
|
def touch(self, context=None, online=True):
|
||||||
"""Touch this conductor's DB record, marking it as up-to-date."""
|
"""Touch this conductor's DB record, marking it as up-to-date."""
|
||||||
self.dbapi.touch_conductor(self.hostname)
|
self.dbapi.touch_conductor(self.hostname, online=online)
|
||||||
|
|
||||||
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
|
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
|
||||||
# methods can be used in the future to replace current explicit RPC calls.
|
# methods can be used in the future to replace current explicit RPC calls.
|
||||||
|
@ -359,7 +359,7 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
|||||||
mock_is_sqlite.return_value = False
|
mock_is_sqlite.return_value = False
|
||||||
self.service._conductor_service_record_keepalive()
|
self.service._conductor_service_record_keepalive()
|
||||||
self.assertEqual(1, mock_is_sqlite.call_count)
|
self.assertEqual(1, mock_is_sqlite.call_count)
|
||||||
mock_touch.assert_called_once_with(self.hostname)
|
mock_touch.assert_called_once_with(self.hostname, online=True)
|
||||||
|
|
||||||
@mock.patch.object(common_utils, 'is_ironic_using_sqlite', autospec=True)
|
@mock.patch.object(common_utils, 'is_ironic_using_sqlite', autospec=True)
|
||||||
def test__conductor_service_record_keepalive_failed_db_conn(
|
def test__conductor_service_record_keepalive_failed_db_conn(
|
||||||
|
@ -156,6 +156,21 @@ class DbConductorTestCase(base.DbTestCase):
|
|||||||
c = self.dbapi.get_conductor(c.hostname)
|
c = self.dbapi.get_conductor(c.hostname)
|
||||||
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
|
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
|
||||||
|
|
||||||
|
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||||
|
def test_touch_conductor_offline(self, mock_utcnow):
|
||||||
|
test_time = datetime.datetime(2000, 1, 1, 0, 0)
|
||||||
|
mock_utcnow.return_value = test_time
|
||||||
|
c = self._create_test_cdr()
|
||||||
|
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
|
||||||
|
|
||||||
|
test_time = datetime.datetime(2000, 1, 1, 0, 1)
|
||||||
|
mock_utcnow.return_value = test_time
|
||||||
|
self.dbapi.touch_conductor(c.hostname, online=False)
|
||||||
|
self.assertRaises(
|
||||||
|
exception.ConductorNotFound,
|
||||||
|
self.dbapi.get_conductor,
|
||||||
|
c.hostname)
|
||||||
|
|
||||||
def test_touch_conductor_not_found(self):
|
def test_touch_conductor_not_found(self):
|
||||||
# A conductor's heartbeat will not create a new record,
|
# A conductor's heartbeat will not create a new record,
|
||||||
# it will only update existing ones
|
# it will only update existing ones
|
||||||
|
@ -77,7 +77,7 @@ class TestConductorObject(db_base.DbTestCase):
|
|||||||
c = objects.Conductor.get_by_hostname(self.context, host)
|
c = objects.Conductor.get_by_hostname(self.context, host)
|
||||||
c.touch(self.context)
|
c.touch(self.context)
|
||||||
mock_get_cdr.assert_called_once_with(host, online=True)
|
mock_get_cdr.assert_called_once_with(host, online=True)
|
||||||
mock_touch_cdr.assert_called_once_with(host)
|
mock_touch_cdr.assert_called_once_with(host, online=True)
|
||||||
|
|
||||||
def test_refresh(self):
|
def test_refresh(self):
|
||||||
host = self.fake_conductor['hostname']
|
host = self.fake_conductor['hostname']
|
||||||
|
Loading…
x
Reference in New Issue
Block a user