Improve graceful shutdown of conductor process

If conductor is being stopped it is trying to wait of completion of
all periodical tasks which are already in the running state. If there
are many nodes assigned to the conductor this may take a long time,
and oslo service library can kill thread by timeout. This patch adds
code
that stops iterations over nodes in periodical tasks if conductor
is being stopped. These changes reduce probability to get locked
nodes after shutdown and time of shutdown.

Closes-Bug: #1701495
Change-Id: If6ea48d01132817a6f47560d3f6ee1756ebfab39
This commit is contained in:
Yuriy Zveryanskyy 2017-05-05 16:19:49 +03:00
parent cd3729ed31
commit b720359c06
5 changed files with 39 additions and 1 deletions

View File

@ -74,6 +74,7 @@ class BaseConductorManager(object):
self.topic = topic self.topic = topic
self.sensors_notifier = rpc.get_sensors_notifier() self.sensors_notifier = rpc.get_sensors_notifier()
self._started = False self._started = False
self._shutdown = None
def init_host(self, admin_context=None): def init_host(self, admin_context=None):
"""Initialize the conductor host. """Initialize the conductor host.
@ -91,6 +92,7 @@ class BaseConductorManager(object):
if self._started: if self._started:
raise RuntimeError(_('Attempt to start an already running ' raise RuntimeError(_('Attempt to start an already running '
'conductor manager')) 'conductor manager'))
self._shutdown = False
self.dbapi = dbapi.get_instance() self.dbapi = dbapi.get_instance()
@ -262,6 +264,7 @@ class BaseConductorManager(object):
# conductor (e.g. when rpc server is unreachable). # conductor (e.g. when rpc server is unreachable).
if not hasattr(self, 'conductor'): if not hasattr(self, 'conductor'):
return return
self._shutdown = True
self._keepalive_evt.set() self._keepalive_evt.set()
if deregister: if deregister:
try: try:
@ -362,6 +365,8 @@ class BaseConductorManager(object):
columns = ['uuid', 'driver'] + list(fields or ()) columns = ['uuid', 'driver'] + list(fields or ())
node_list = self.dbapi.get_nodeinfo_list(columns=columns, **kwargs) node_list = self.dbapi.get_nodeinfo_list(columns=columns, **kwargs)
for result in node_list: for result in node_list:
if self._shutdown:
break
if self._mapped_to_this_conductor(*result[:2]): if self._mapped_to_this_conductor(*result[:2]):
yield result yield result

View File

@ -2147,7 +2147,7 @@ class ConductorManager(base_manager.BaseConductorManager):
@METRICS.timer('ConductorManager._sensors_nodes_task') @METRICS.timer('ConductorManager._sensors_nodes_task')
def _sensors_nodes_task(self, context, nodes): def _sensors_nodes_task(self, context, nodes):
"""Sends sensors data for nodes from synchronized queue.""" """Sends sensors data for nodes from synchronized queue."""
while True: while not self._shutdown:
try: try:
node_uuid, driver, instance_uuid = nodes.get_nowait() node_uuid, driver, instance_uuid = nodes.get_nowait()
except queue.Empty: except queue.Empty:

View File

@ -272,6 +272,12 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
missing_parameters_error): missing_parameters_error):
self._start_service() self._start_service()
def test_conductor_shutdown_flag(self):
self._start_service()
self.assertFalse(self.service._shutdown)
self.service.del_host()
self.assertTrue(self.service._shutdown)
class CheckInterfacesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): class CheckInterfacesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test__check_enabled_interfaces_success(self): def test__check_enabled_interfaces_success(self):

View File

@ -2848,6 +2848,19 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
'deploying', 'provision_updated_at', 'deploying', 'provision_updated_at',
last_error=mock.ANY) last_error=mock.ANY)
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
def test_iter_nodes_shutdown(self, mock_nodeinfo_list):
self._start_service()
self.columns = ['uuid', 'driver', 'id']
nodes = [self._create_node(driver='fake')]
mock_nodeinfo_list.return_value = self._get_nodeinfo_list_response(
nodes)
self.service._shutdown = True
result = list(self.service.iter_nodes(fields=['id'],
filters=mock.sentinel.filters))
self.assertEqual([], result)
@mgr_utils.mock_record_keepalive @mgr_utils.mock_record_keepalive
class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
@ -3511,6 +3524,16 @@ class UpdatePortTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertEqual(5, validate_mock.call_count) self.assertEqual(5, validate_mock.call_count)
self.assertEqual(5, get_sensors_data_mock.call_count) self.assertEqual(5, get_sensors_data_mock.call_count)
@mock.patch.object(task_manager, 'acquire')
def test_send_sensor_task_shutdown(self, acquire_mock):
nodes = queue.Queue()
nodes.put_nowait(('fake_uuid', 'fake', None))
self._start_service()
self.service._shutdown = True
CONF.set_override('send_sensor_data', True, group='conductor')
self.service._sensors_nodes_task(self.context, nodes)
acquire_mock.__enter__.assert_not_called()
@mock.patch.object(task_manager, 'acquire', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True)
def test_send_sensor_task_no_management(self, acquire_mock): def test_send_sensor_task_no_management(self, acquire_mock):
nodes = queue.Queue() nodes = queue.Queue()

View File

@ -0,0 +1,4 @@
---
fixes:
- Shutdown of conductor process should take less time, as we do not wait for
completion of all periodic tasks.