Extend the Conductor RPC object
This patch is extending the conductor RPC object adding a method for registering and unregistering conductors. The touch() method was also update to make the context parameter optional (since it's not actually needed). The base_manager.py module have been updated to use the new methods. Change-Id: Ifb74909bc2cd39ce4ad056f7174285497e35035c
This commit is contained in:
parent
ee841f2a9f
commit
6dd72d9426
@ -36,6 +36,7 @@ from ironic.common import rpc
|
||||
from ironic.common import states
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.db import api as dbapi
|
||||
from ironic import objects
|
||||
|
||||
|
||||
conductor_opts = [
|
||||
@ -141,8 +142,8 @@ class BaseConductorManager(object):
|
||||
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
||||
try:
|
||||
# Register this conductor with the cluster
|
||||
cdr = self.dbapi.register_conductor({'hostname': self.host,
|
||||
'drivers': driver_names})
|
||||
self.conductor = objects.Conductor.register(
|
||||
admin_context, self.host, driver_names)
|
||||
except exception.ConductorAlreadyRegistered:
|
||||
# This conductor was already registered and did not shut down
|
||||
# properly, so log a warning and update the record.
|
||||
@ -150,10 +151,8 @@ class BaseConductorManager(object):
|
||||
_LW("A conductor with hostname %(hostname)s "
|
||||
"was previously registered. Updating registration"),
|
||||
{'hostname': self.host})
|
||||
cdr = self.dbapi.register_conductor({'hostname': self.host,
|
||||
'drivers': driver_names},
|
||||
update_existing=True)
|
||||
self.conductor = cdr
|
||||
self.conductor = objects.Conductor.register(
|
||||
admin_context, self.host, driver_names, update_existing=True)
|
||||
|
||||
# Start periodic tasks
|
||||
self._periodic_tasks_worker = self._executor.submit(
|
||||
@ -201,7 +200,7 @@ class BaseConductorManager(object):
|
||||
# Inform the cluster that this conductor is shutting down.
|
||||
# Note that rebalancing will not occur immediately, but when
|
||||
# the periodic sync takes place.
|
||||
self.dbapi.unregister_conductor(self.host)
|
||||
self.conductor.unregister()
|
||||
LOG.info(_LI('Successfully stopped conductor with hostname '
|
||||
'%(hostname)s.'),
|
||||
{'hostname': self.host})
|
||||
@ -283,7 +282,7 @@ class BaseConductorManager(object):
|
||||
def _conductor_service_record_keepalive(self):
|
||||
while not self._keepalive_evt.is_set():
|
||||
try:
|
||||
self.dbapi.touch_conductor(self.host)
|
||||
self.conductor.touch()
|
||||
except db_exception.DBConnectionError:
|
||||
LOG.warning(_LW('Conductor could not connect to database '
|
||||
'while heartbeating.'))
|
||||
|
@ -24,6 +24,10 @@ from ironic.objects import fields as object_fields
|
||||
|
||||
@base.IronicObjectRegistry.register
|
||||
class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: Add register() and unregister(), make the context parameter
|
||||
# to touch() optional.
|
||||
VERSION = '1.1'
|
||||
|
||||
dbapi = db_api.get_instance()
|
||||
|
||||
@ -79,6 +83,37 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
|
||||
# methods can be used in the future to replace current explicit RPC calls.
|
||||
# Implications of calling new remote procedures should be thought through.
|
||||
# @object_base.remotable
|
||||
def touch(self, context):
|
||||
def touch(self, context=None):
|
||||
"""Touch this conductor's DB record, marking it as up-to-date."""
|
||||
self.dbapi.touch_conductor(self.hostname)
|
||||
|
||||
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
|
||||
# methods can be used in the future to replace current explicit RPC calls.
|
||||
# Implications of calling new remote procedures should be thought through.
|
||||
# @object_base.remotable
|
||||
@classmethod
|
||||
def register(cls, context, hostname, drivers, update_existing=False):
|
||||
"""Register an active conductor with the cluster.
|
||||
|
||||
:param hostname: the hostname on which the conductor will run
|
||||
:param drivers: the list of drivers enabled in the conductor
|
||||
:param update_existing: When false, registration will raise an
|
||||
exception when a conflicting online record
|
||||
is found. When true, will overwrite the
|
||||
existing record. Default: False.
|
||||
:raises: ConductorAlreadyRegistered
|
||||
:returns: a :class:`Conductor` object.
|
||||
|
||||
"""
|
||||
db_cond = cls.dbapi.register_conductor({'hostname': hostname,
|
||||
'drivers': drivers},
|
||||
update_existing=update_existing)
|
||||
return Conductor._from_db_object(cls(context), db_cond)
|
||||
|
||||
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
|
||||
# methods can be used in the future to replace current explicit RPC calls.
|
||||
# Implications of calling new remote procedures should be thought through.
|
||||
# @object_base.remotable
|
||||
def unregister(self, context=None):
|
||||
"""Remove this conductor from the service registry."""
|
||||
self.dbapi.unregister_conductor(self.hostname)
|
||||
|
@ -85,3 +85,34 @@ class TestConductorObject(base.DbTestCase):
|
||||
c.updated_at)
|
||||
self.assertEqual(expected, mock_get_cdr.call_args_list)
|
||||
self.assertEqual(self.context, c._context)
|
||||
|
||||
def _test_register(self, update_existing=False):
|
||||
host = self.fake_conductor['hostname']
|
||||
drivers = self.fake_conductor['drivers']
|
||||
with mock.patch.object(self.dbapi, 'register_conductor',
|
||||
autospec=True) as mock_register_cdr:
|
||||
mock_register_cdr.return_value = self.fake_conductor
|
||||
c = objects.Conductor.register(self.context, host, drivers,
|
||||
update_existing=update_existing)
|
||||
|
||||
self.assertIsInstance(c, objects.Conductor)
|
||||
mock_register_cdr.assert_called_once_with(
|
||||
{'drivers': drivers, 'hostname': host},
|
||||
update_existing=update_existing)
|
||||
|
||||
def test_register(self):
|
||||
self._test_register()
|
||||
|
||||
def test_register_update_existing_true(self):
|
||||
self._test_register(update_existing=True)
|
||||
|
||||
def test_unregister(self):
|
||||
host = self.fake_conductor['hostname']
|
||||
with mock.patch.object(self.dbapi, 'get_conductor',
|
||||
autospec=True) as mock_get_cdr:
|
||||
with mock.patch.object(self.dbapi, 'unregister_conductor',
|
||||
autospec=True) as mock_unregister_cdr:
|
||||
mock_get_cdr.return_value = self.fake_conductor
|
||||
c = objects.Conductor.get_by_hostname(self.context, host)
|
||||
c.unregister()
|
||||
mock_unregister_cdr.assert_called_once_with(host)
|
||||
|
@ -409,7 +409,7 @@ expected_object_fingerprints = {
|
||||
'Chassis': '1.3-d656e039fd8ae9f34efc232ab3980905',
|
||||
'Port': '1.5-a224755c3da5bc5cf1a14a11c0d00f3f',
|
||||
'Portgroup': '1.0-1ac4db8fa31edd9e1637248ada4c25a1',
|
||||
'Conductor': '1.0-5091f249719d4a465062a1b3dc7f860d'
|
||||
'Conductor': '1.1-5091f249719d4a465062a1b3dc7f860d'
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user