Merge "Improve graceful shutdown of conductor process"
This commit is contained in:
commit
f641463cfb
@ -74,6 +74,7 @@ class BaseConductorManager(object):
|
||||
self.topic = topic
|
||||
self.sensors_notifier = rpc.get_sensors_notifier()
|
||||
self._started = False
|
||||
self._shutdown = None
|
||||
|
||||
def init_host(self, admin_context=None):
|
||||
"""Initialize the conductor host.
|
||||
@ -91,6 +92,7 @@ class BaseConductorManager(object):
|
||||
if self._started:
|
||||
raise RuntimeError(_('Attempt to start an already running '
|
||||
'conductor manager'))
|
||||
self._shutdown = False
|
||||
|
||||
self.dbapi = dbapi.get_instance()
|
||||
|
||||
@ -262,6 +264,7 @@ class BaseConductorManager(object):
|
||||
# conductor (e.g. when rpc server is unreachable).
|
||||
if not hasattr(self, 'conductor'):
|
||||
return
|
||||
self._shutdown = True
|
||||
self._keepalive_evt.set()
|
||||
if deregister:
|
||||
try:
|
||||
@ -362,6 +365,8 @@ class BaseConductorManager(object):
|
||||
columns = ['uuid', 'driver'] + list(fields or ())
|
||||
node_list = self.dbapi.get_nodeinfo_list(columns=columns, **kwargs)
|
||||
for result in node_list:
|
||||
if self._shutdown:
|
||||
break
|
||||
if self._mapped_to_this_conductor(*result[:2]):
|
||||
yield result
|
||||
|
||||
|
@ -2147,7 +2147,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
@METRICS.timer('ConductorManager._sensors_nodes_task')
|
||||
def _sensors_nodes_task(self, context, nodes):
|
||||
"""Sends sensors data for nodes from synchronized queue."""
|
||||
while True:
|
||||
while not self._shutdown:
|
||||
try:
|
||||
node_uuid, driver, instance_uuid = nodes.get_nowait()
|
||||
except queue.Empty:
|
||||
|
@ -272,6 +272,12 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
missing_parameters_error):
|
||||
self._start_service()
|
||||
|
||||
def test_conductor_shutdown_flag(self):
|
||||
self._start_service()
|
||||
self.assertFalse(self.service._shutdown)
|
||||
self.service.del_host()
|
||||
self.assertTrue(self.service._shutdown)
|
||||
|
||||
|
||||
class CheckInterfacesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
def test__check_enabled_interfaces_success(self):
|
||||
|
@ -2848,6 +2848,19 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
|
||||
'deploying', 'provision_updated_at',
|
||||
last_error=mock.ANY)
|
||||
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
def test_iter_nodes_shutdown(self, mock_nodeinfo_list):
|
||||
self._start_service()
|
||||
self.columns = ['uuid', 'driver', 'id']
|
||||
nodes = [self._create_node(driver='fake')]
|
||||
mock_nodeinfo_list.return_value = self._get_nodeinfo_list_response(
|
||||
nodes)
|
||||
self.service._shutdown = True
|
||||
|
||||
result = list(self.service.iter_nodes(fields=['id'],
|
||||
filters=mock.sentinel.filters))
|
||||
self.assertEqual([], result)
|
||||
|
||||
|
||||
@mgr_utils.mock_record_keepalive
|
||||
class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
@ -3511,6 +3524,16 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
self.assertEqual(5, validate_mock.call_count)
|
||||
self.assertEqual(5, get_sensors_data_mock.call_count)
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
def test_send_sensor_task_shutdown(self, acquire_mock):
|
||||
nodes = queue.Queue()
|
||||
nodes.put_nowait(('fake_uuid', 'fake', None))
|
||||
self._start_service()
|
||||
self.service._shutdown = True
|
||||
CONF.set_override('send_sensor_data', True, group='conductor')
|
||||
self.service._sensors_nodes_task(self.context, nodes)
|
||||
acquire_mock.__enter__.assert_not_called()
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test_send_sensor_task_no_management(self, acquire_mock):
|
||||
nodes = queue.Queue()
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
fixes:
|
||||
- Shutdown of conductor process should take less time, as we do not wait for
|
||||
completion of all periodic tasks.
|
Loading…
x
Reference in New Issue
Block a user