Pass zk connection to ProviderManager.start()
In order to support static node pre-registration, we need to give the provider manager the opportunity to register/deregister any nodes in its configuration file when it starts (on startup or when the config change). It will need a ZooKeeper connection to do this. The OpenStack driver will ignore this parameter. Change-Id: Idd00286b2577921b3fe5b55e8f13a27f2fbde5d6
This commit is contained in:
parent
7eeefebbd4
commit
a418aabb7a
@ -536,6 +536,7 @@ class CleanupWorker(BaseWorker):
|
||||
|
||||
self._checkForZooKeeperChanges(new_config)
|
||||
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
||||
self._zk,
|
||||
use_taskmanager=False)
|
||||
self._config = new_config
|
||||
|
||||
@ -878,6 +879,7 @@ class UploadWorker(BaseWorker):
|
||||
|
||||
self._checkForZooKeeperChanges(new_config)
|
||||
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
||||
self._zk,
|
||||
use_taskmanager=False)
|
||||
self._config = new_config
|
||||
|
||||
|
@ -251,7 +251,7 @@ class NodePoolCmd(NodepoolApp):
|
||||
return
|
||||
provider = self.pool.config.providers[node.provider]
|
||||
manager = provider_manager.get_provider(provider, True)
|
||||
manager.start()
|
||||
manager.start(self.zk)
|
||||
launcher.NodeDeleter.delete(self.zk, manager, node)
|
||||
manager.stop()
|
||||
else:
|
||||
|
@ -145,11 +145,15 @@ class Provider(object, metaclass=abc.ABCMeta):
|
||||
|
||||
"""
|
||||
@abc.abstractmethod
|
||||
def start(self):
|
||||
def start(self, zk_conn):
|
||||
"""Start this provider
|
||||
|
||||
:param ZooKeeper zk_conn: A ZooKeeper connection object.
|
||||
|
||||
This is called after each configuration change to allow the driver
|
||||
to perform initialization tasks and start background threads.
|
||||
to perform initialization tasks and start background threads. The
|
||||
ZooKeeper connection object is provided if the Provider needs to
|
||||
interact with it.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
@ -118,7 +118,7 @@ class OpenStackProvider(Provider):
|
||||
self._taskmanager = None
|
||||
self._current_nodepool_quota = None
|
||||
|
||||
def start(self):
|
||||
def start(self, zk_conn):
|
||||
if self._use_taskmanager:
|
||||
self._taskmanager = TaskManager(None, self.provider.name,
|
||||
self.provider.rate)
|
||||
|
@ -57,7 +57,7 @@ class StaticNodeProvider(Provider):
|
||||
raise StaticNodeError("%s: host key mismatches (%s)" %
|
||||
(node["name"], keys))
|
||||
|
||||
def start(self):
|
||||
def start(self, zk_conn):
|
||||
for pool in self.provider.pools.values():
|
||||
self.pools[pool.name] = {}
|
||||
for node in pool.nodes:
|
||||
|
@ -22,7 +22,7 @@ class TestProvider(Provider):
|
||||
def __init__(self, provider):
|
||||
self.provider = provider
|
||||
|
||||
def start(self):
|
||||
def start(self, zk_conn):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
|
@ -819,8 +819,9 @@ class NodePool(threading.Thread):
|
||||
|
||||
def updateConfig(self):
|
||||
config = self.loadConfig()
|
||||
provider_manager.ProviderManager.reconfigure(self.config, config)
|
||||
self.reconfigureZooKeeper(config)
|
||||
provider_manager.ProviderManager.reconfigure(self.config, config,
|
||||
self.getZK())
|
||||
self.setConfig(config)
|
||||
|
||||
def removeCompletedRequests(self):
|
||||
|
@ -30,7 +30,18 @@ class ProviderManager(object):
|
||||
log = logging.getLogger("nodepool.ProviderManager")
|
||||
|
||||
@staticmethod
|
||||
def reconfigure(old_config, new_config, use_taskmanager=True):
|
||||
def reconfigure(old_config, new_config, zk_conn, use_taskmanager=True):
|
||||
'''
|
||||
Reconfigure the provider managers on any configuration changes.
|
||||
|
||||
If a provider configuration changes, stop the current provider
|
||||
manager we have cached and replace it with a new one.
|
||||
|
||||
:param Config old_config: The previously read configuration.
|
||||
:param Config new_config: The newly read configuration.
|
||||
:param ZooKeeper zk_conn: A ZooKeeper connection object.
|
||||
:param bool use_taskmanager: If True, use a task manager.
|
||||
'''
|
||||
stop_managers = []
|
||||
for p in new_config.providers.values():
|
||||
oldmanager = None
|
||||
@ -46,7 +57,7 @@ class ProviderManager(object):
|
||||
" for %s" % p.name)
|
||||
new_config.provider_managers[p.name] = \
|
||||
get_provider(p, use_taskmanager)
|
||||
new_config.provider_managers[p.name].start()
|
||||
new_config.provider_managers[p.name].start(zk_conn)
|
||||
|
||||
for stop_manager in stop_managers:
|
||||
stop_manager.stop()
|
||||
|
@ -61,7 +61,7 @@ class TestShadeIntegration(tests.IntegrationTestCase):
|
||||
self.assertIn('real-provider', config.providers)
|
||||
pm = provider_manager.get_provider(
|
||||
config.providers['real-provider'], use_taskmanager=False)
|
||||
pm.start()
|
||||
pm.start(None)
|
||||
self.assertEqual(pm._client.auth, auth_data)
|
||||
|
||||
def test_nodepool_occ_config_reload(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user