Introduce new driver call and RPC for heartbeat
Change-Id: Iec31feb07b85f9ed668d354967f8d265233a2bc1 Partial-Bug: #1570841
This commit is contained in:
parent
088f09903b
commit
97f96642a9
@ -81,7 +81,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
"""Ironic Conductor manager main class."""
|
||||
|
||||
# NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
|
||||
RPC_API_VERSION = '1.33'
|
||||
RPC_API_VERSION = '1.34'
|
||||
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
@ -2093,6 +2093,25 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
|
||||
return driver.raid.get_logical_disk_properties()
|
||||
|
||||
@messaging.expected_exceptions(exception.NoFreeConductorWorker)
|
||||
def heartbeat(self, context, node_id, callback_url):
|
||||
"""Process a heartbeat from the ramdisk.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param callback_url: URL to reach back to the ramdisk.
|
||||
:raises: NoFreeConductorWorker if there are no conductors to process
|
||||
this heartbeat request.
|
||||
"""
|
||||
LOG.debug('RPC heartbeat called for node %s', node_id)
|
||||
|
||||
# NOTE(dtantsur): we acquire a shared lock to begin with, drivers are
|
||||
# free to promote it to an exclusive one.
|
||||
with task_manager.acquire(context, node_id, shared=True,
|
||||
purpose='heartbeat') as task:
|
||||
task.spawn_after(self._spawn_worker, task.driver.deploy.heartbeat,
|
||||
task, callback_url)
|
||||
|
||||
def _object_dispatch(self, target, method, context, args, kwargs):
|
||||
"""Dispatch a call to an object method.
|
||||
|
||||
|
@ -80,11 +80,12 @@ class ConductorAPI(object):
|
||||
| object_backport_versions
|
||||
| 1.32 - Add do_node_clean
|
||||
| 1.33 - Added update and destroy portgroup.
|
||||
| 1.34 - Added heartbeat
|
||||
|
||||
"""
|
||||
|
||||
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
|
||||
RPC_API_VERSION = '1.33'
|
||||
RPC_API_VERSION = '1.34'
|
||||
|
||||
def __init__(self, topic=None):
|
||||
super(ConductorAPI, self).__init__()
|
||||
@ -646,6 +647,18 @@ class ConductorAPI(object):
|
||||
return cctxt.call(context, 'do_node_clean',
|
||||
node_id=node_id, clean_steps=clean_steps)
|
||||
|
||||
def heartbeat(self, context, node_id, callback_url, topic=None):
|
||||
"""Process a node heartbeat.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node ID or UUID.
|
||||
:param callback_url: URL to reach back to the ramdisk.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.34')
|
||||
return cctxt.call(context, 'heartbeat', node_id=node_id,
|
||||
callback_url=callback_url)
|
||||
|
||||
def object_class_action_versions(self, context, objname, objmethod,
|
||||
object_versions, args, kwargs):
|
||||
"""Perform an action on a VersionedObject class.
|
||||
|
@ -30,7 +30,7 @@ from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
from ironic.common import exception
|
||||
from ironic.common.i18n import _, _LE
|
||||
from ironic.common.i18n import _, _LE, _LW
|
||||
from ironic.common import raid
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -375,6 +375,17 @@ class DeployInterface(BaseInterface):
|
||||
"""
|
||||
pass
|
||||
|
||||
def heartbeat(self, task, callback_url):
|
||||
"""Record a heartbeat for the node.
|
||||
|
||||
:param task: a TaskManager instance containing the node to act on.
|
||||
:param callback_url: a URL to use to call to the ramdisk.
|
||||
:return: None
|
||||
"""
|
||||
LOG.warning(_LW('Got heartbeat message from node %(node)s, but '
|
||||
'the driver %(driver)s does not support heartbeating'),
|
||||
{'node': task.node.uuid, 'driver': task.node.driver})
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BootInterface(object):
|
||||
|
@ -5016,3 +5016,16 @@ class DoNodeAdoptionTestCase(
|
||||
self.assertEqual(states.MANAGEABLE, node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, node.target_provision_state)
|
||||
self.assertIsNone(node.last_error)
|
||||
|
||||
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
|
||||
def test_heartbeat(self, mock_spawn):
|
||||
"""Test heartbeating."""
|
||||
node = obj_utils.create_test_node(
|
||||
self.context, driver='fake',
|
||||
provision_state=states.DEPLOYING,
|
||||
target_provision_state=states.ACTIVE)
|
||||
|
||||
self._start_service()
|
||||
self.service.heartbeat(self.context, node.uuid, 'http://callback')
|
||||
mock_spawn.assert_called_with(self.driver.deploy.heartbeat,
|
||||
mock.ANY, 'http://callback')
|
||||
|
@ -393,3 +393,10 @@ class RPCAPITestCase(base.DbTestCase):
|
||||
'call',
|
||||
version='1.33',
|
||||
portgroup=self.fake_portgroup)
|
||||
|
||||
def test_heartbeat(self):
|
||||
self._test_rpcapi('heartbeat',
|
||||
'call',
|
||||
node_id='fake-node',
|
||||
callback_url='http://ramdisk.url:port',
|
||||
version='1.34')
|
||||
|
@ -20,6 +20,7 @@ import mock
|
||||
from ironic.common import exception
|
||||
from ironic.common import raid
|
||||
from ironic.drivers import base as driver_base
|
||||
from ironic.drivers.modules import fake
|
||||
from ironic.tests import base
|
||||
|
||||
|
||||
@ -389,3 +390,14 @@ class RAIDInterfaceTestCase(base.TestCase):
|
||||
raid_interface = MyRAIDInterface()
|
||||
raid_interface.get_logical_disk_properties()
|
||||
get_properties_mock.assert_called_once_with(raid_schema)
|
||||
|
||||
|
||||
class TestDeployInterface(base.TestCase):
|
||||
@mock.patch.object(driver_base.LOG, 'warning', autospec=True)
|
||||
def test_warning_on_heartbeat(self, mock_log):
|
||||
# NOTE(dtantsur): FakeDeploy does not override heartbeat
|
||||
deploy = fake.FakeDeploy()
|
||||
deploy.heartbeat(mock.Mock(node=mock.Mock(uuid='uuid',
|
||||
driver='driver')),
|
||||
'url')
|
||||
self.assertTrue(mock_log.called)
|
||||
|
Loading…
x
Reference in New Issue
Block a user