Merge "Add the component registry from Zuul"
This commit is contained in:
commit
492f6d5216
@ -32,6 +32,8 @@ from nodepool import provider_manager
|
||||
from nodepool import stats
|
||||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk import ZooKeeperClient
|
||||
from nodepool.zk.components import BuilderComponent
|
||||
from nodepool.version import get_version_string
|
||||
|
||||
|
||||
MINS = 60
|
||||
@ -1402,6 +1404,13 @@ class NodePoolBuilder(object):
|
||||
)
|
||||
self.zk_client.connect()
|
||||
self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False)
|
||||
|
||||
hostname = socket.gethostname()
|
||||
self.component_info = BuilderComponent(
|
||||
self.zk_client, hostname,
|
||||
version=get_version_string())
|
||||
self.component_info.register()
|
||||
|
||||
self.log.debug('Starting listener for build jobs')
|
||||
|
||||
# Create build and upload worker objects
|
||||
|
@ -641,7 +641,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
|
||||
# want to make sure we don't continuously grow this array.
|
||||
if self.launcher_id not in self.request.declined_by:
|
||||
self.request.declined_by.append(self.launcher_id)
|
||||
launchers = set([x.id for x in self.zk.getRegisteredLaunchers()])
|
||||
launchers = set([x.id for x in self.zk.getRegisteredPools()])
|
||||
if launchers.issubset(set(self.request.declined_by)):
|
||||
# All launchers have declined it
|
||||
self.log.debug("Failing declined node request")
|
||||
|
@ -32,9 +32,10 @@ from nodepool import stats
|
||||
from nodepool import config as nodepool_config
|
||||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk import ZooKeeperClient
|
||||
from nodepool.zk.components import LauncherComponent, PoolComponent
|
||||
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
||||
from nodepool.logconfig import get_annotated_logger
|
||||
from nodepool.version import version_info as npd_version_info
|
||||
from nodepool.version import get_version_string
|
||||
|
||||
|
||||
MINS = 60
|
||||
@ -109,7 +110,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
# become out of date as the loop progresses, but it should be
|
||||
# good enough to determine whether we should process requests
|
||||
# which express a preference for a specific provider.
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
launchers = self.zk.getRegisteredPools()
|
||||
|
||||
pm = self.getProviderManager()
|
||||
has_quota_support = isinstance(pm, QuotaSupport)
|
||||
@ -174,7 +175,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
candidate_launchers = set(
|
||||
x.id for x in launchers
|
||||
if x.provider_name == req.provider
|
||||
and x.supported_labels.issuperset(req.node_types))
|
||||
and set(x.supported_labels).issuperset(req.node_types))
|
||||
if candidate_launchers:
|
||||
# There is a launcher online which can satisfy the request
|
||||
if not candidate_launchers.issubset(set(req.declined_by)):
|
||||
@ -349,6 +350,22 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
def run(self):
|
||||
self.running = True
|
||||
|
||||
# Make sure we're always registered with ZK
|
||||
hostname = socket.gethostname()
|
||||
self.component_info = PoolComponent(
|
||||
self.zk.zk_client, hostname,
|
||||
version=get_version_string())
|
||||
labels = set()
|
||||
for prov_cfg in self.nodepool.config.providers.values():
|
||||
labels.update(prov_cfg.getSupportedLabels())
|
||||
self.component_info.content.update({
|
||||
'id': self.launcher_id,
|
||||
'provider_name': self.provider_name,
|
||||
'supported_labels': list(labels),
|
||||
'state': self.component_info.RUNNING,
|
||||
})
|
||||
self.component_info.register()
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Don't do work if we've lost communication with the ZK cluster
|
||||
@ -360,14 +377,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
if did_suspend:
|
||||
self.log.info("ZooKeeper available. Resuming")
|
||||
|
||||
# Make sure we're always registered with ZK
|
||||
launcher = zk.Launcher()
|
||||
launcher.id = self.launcher_id
|
||||
labels = set()
|
||||
for prov_cfg in self.nodepool.config.providers.values():
|
||||
launcher.supported_labels.update(
|
||||
prov_cfg.getSupportedLabels())
|
||||
launcher.provider_name = self.provider_name
|
||||
self.zk.registerLauncher(launcher)
|
||||
labels.update(prov_cfg.getSupportedLabels())
|
||||
if set(self.component_info.supported_labels) != labels:
|
||||
self.component_info.supported_labels = list(labels)
|
||||
|
||||
self.updateProviderLimits(
|
||||
self.nodepool.config.providers.get(self.provider_name))
|
||||
@ -408,7 +422,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
'''
|
||||
self.log.info("%s received stop" % self.name)
|
||||
self.running = False
|
||||
self.zk.deregisterLauncher(self.launcher_id)
|
||||
self.component_info.unregister()
|
||||
self.stop_event.set()
|
||||
|
||||
|
||||
@ -984,6 +998,12 @@ class NodePool(threading.Thread):
|
||||
)
|
||||
self.zk_client.connect()
|
||||
self.zk = zk.ZooKeeper(self.zk_client)
|
||||
|
||||
hostname = socket.gethostname()
|
||||
self.component_info = LauncherComponent(
|
||||
self.zk_client, hostname,
|
||||
version=get_version_string())
|
||||
self.component_info.register()
|
||||
else:
|
||||
self.log.debug("Detected ZooKeeper server changes")
|
||||
self.zk_client.resetHosts(configured)
|
||||
@ -1144,7 +1164,7 @@ class NodePool(threading.Thread):
|
||||
Start point for the NodePool thread.
|
||||
'''
|
||||
self.log.info("Nodepool launcher %s starting",
|
||||
npd_version_info.release_string())
|
||||
get_version_string())
|
||||
while not self._stopped:
|
||||
try:
|
||||
self.updateConfig()
|
||||
@ -1158,6 +1178,8 @@ class NodePool(threading.Thread):
|
||||
if did_suspend:
|
||||
self.log.info("ZooKeeper available. Resuming")
|
||||
|
||||
if self.component_info.state != self.component_info.RUNNING:
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
self.createMinReady()
|
||||
|
||||
if not self._cleanup_thread:
|
||||
|
16
nodepool/model_api.py
Normal file
16
nodepool/model_api.py
Normal file
@ -0,0 +1,16 @@
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Currently unused. Included here for future use similar to Zuul.
|
||||
MODEL_API = 0
|
@ -20,6 +20,7 @@ import statsd
|
||||
|
||||
from nodepool.zk import zookeeper as zk
|
||||
|
||||
|
||||
log = logging.getLogger("nodepool.stats")
|
||||
|
||||
|
||||
@ -97,7 +98,7 @@ class StatsReporter(object):
|
||||
|
||||
states = {}
|
||||
|
||||
launchers = zk_conn.getRegisteredLaunchers()
|
||||
launchers = zk_conn.getRegisteredPools()
|
||||
labels = set()
|
||||
for launcher in launchers:
|
||||
labels.update(launcher.supported_labels)
|
||||
|
@ -267,9 +267,9 @@ def label_list(zk):
|
||||
# NOTE(ianw): maybe add to each entry a list of which
|
||||
# launchers support the label?
|
||||
labels = set()
|
||||
launchers = zk.getRegisteredLaunchers()
|
||||
launchers = zk.getRegisteredPools()
|
||||
for launcher in launchers:
|
||||
labels.update(launcher.supported_labels)
|
||||
labels.update(set(launcher.supported_labels))
|
||||
|
||||
for label in labels:
|
||||
objs.append({'label': label})
|
||||
|
@ -38,6 +38,7 @@ from nodepool import launcher
|
||||
from nodepool import webapp
|
||||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk import ZooKeeperClient
|
||||
from nodepool.zk.components import COMPONENT_REGISTRY
|
||||
from nodepool.cmd.config_validator import ConfigValidator
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
|
||||
@ -160,9 +161,19 @@ class StatsdFixture(fixtures.Fixture):
|
||||
self.thread.join()
|
||||
|
||||
|
||||
class GlobalRegistryFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.addCleanup(self._cleanup)
|
||||
|
||||
def _cleanup(self):
|
||||
# Remove our component registry from the global
|
||||
COMPONENT_REGISTRY.clearRegistry()
|
||||
|
||||
|
||||
class BaseTestCase(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
self.useFixture(GlobalRegistryFixture())
|
||||
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60)
|
||||
try:
|
||||
test_timeout = int(test_timeout)
|
||||
|
@ -18,13 +18,16 @@ import math
|
||||
import time
|
||||
import fixtures
|
||||
import mock
|
||||
import socket
|
||||
import testtools
|
||||
|
||||
from nodepool import tests
|
||||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk.components import PoolComponent
|
||||
from nodepool.driver.fake import provider as fakeprovider
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
import nodepool.launcher
|
||||
from nodepool.version import get_version_string
|
||||
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
@ -761,10 +764,17 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# Create a dummy launcher with a different set of supported labels
|
||||
# than what we are going to request.
|
||||
dummy_launcher = zk.Launcher()
|
||||
dummy_launcher.provider_name = 'other-provider'
|
||||
dummy_launcher.supported_labels = {'other-label', }
|
||||
self.zk.registerLauncher(dummy_launcher)
|
||||
hostname = socket.gethostname()
|
||||
dummy_component = PoolComponent(
|
||||
self.zk.zk_client, hostname,
|
||||
version=get_version_string())
|
||||
dummy_component.content.update({
|
||||
'id': 'dummy',
|
||||
'provider_name': 'other-provider',
|
||||
'supported_labels': ['other-label'],
|
||||
'state': dummy_component.RUNNING,
|
||||
})
|
||||
dummy_component.register()
|
||||
|
||||
# Node request for a specific provider that doesn't support the
|
||||
# requested node type.
|
||||
@ -778,6 +788,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
self.assertEqual(len(req.nodes), 1)
|
||||
self.zk.getNode(req.nodes[0])
|
||||
dummy_component.unregister()
|
||||
|
||||
def test_node_boot_from_volume(self):
|
||||
"""Test that an image and node are created from a volume"""
|
||||
@ -1684,7 +1695,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.assertEqual(2, len(pool._pool_threads))
|
||||
|
||||
# We should have two pool workers registered
|
||||
self.assertEqual(2, len(self.zk.getRegisteredLaunchers()))
|
||||
self.assertEqual(2, len(self.zk.getRegisteredPools()))
|
||||
|
||||
self.replace_config(configfile, 'launcher_two_provider_remove.yaml')
|
||||
|
||||
@ -1698,7 +1709,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
pass
|
||||
|
||||
# We should have one pool worker registered
|
||||
self.assertEqual(1, len(self.zk.getRegisteredLaunchers()))
|
||||
self.assertEqual(1, len(self.zk.getRegisteredPools()))
|
||||
|
||||
def test_failed_provider(self):
|
||||
"""Test that broken provider doesn't fail node requests."""
|
||||
@ -1914,18 +1925,19 @@ class TestLauncher(tests.DBTestCase):
|
||||
pool.start()
|
||||
|
||||
self.waitForNodes('fake-label')
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
launchers = self.zk.getRegisteredPools()
|
||||
self.assertEqual(1, len(launchers))
|
||||
|
||||
# the fake-label-unused label should not appear
|
||||
self.assertEqual({'fake-label'}, launchers[0].supported_labels)
|
||||
self.assertEqual({'fake-label'}, set(launchers[0].supported_labels))
|
||||
|
||||
self.replace_config(configfile, 'launcher_reg2.yaml')
|
||||
|
||||
# we should get 1 additional label now
|
||||
while launchers[0].supported_labels != {'fake-label', 'fake-label2'}:
|
||||
while (set(launchers[0].supported_labels) !=
|
||||
{'fake-label', 'fake-label2'}):
|
||||
time.sleep(1)
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
launchers = self.zk.getRegisteredPools()
|
||||
|
||||
@mock.patch('nodepool.driver.openstack.handler.'
|
||||
'OpenStackNodeLauncher._launchNode')
|
||||
|
@ -14,12 +14,39 @@
|
||||
import testtools
|
||||
import time
|
||||
import uuid
|
||||
import socket
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
from nodepool import tests
|
||||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk.components import PoolComponent
|
||||
from nodepool.config import ZooKeeperConnectionConfig, buildZooKeeperHosts
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
from nodepool.version import get_version_string
|
||||
|
||||
|
||||
class TestComponentRegistry(tests.DBTestCase):
|
||||
|
||||
def test_pool_component(self):
|
||||
hostname = socket.gethostname()
|
||||
launcher = PoolComponent(
|
||||
self.zk.zk_client, hostname,
|
||||
version=get_version_string())
|
||||
launcher.content.update({
|
||||
'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex,
|
||||
'provider_name': 'provider',
|
||||
'supported_labels': [],
|
||||
'state': launcher.RUNNING,
|
||||
})
|
||||
launcher.register()
|
||||
|
||||
launchers = self.zk.getRegisteredPools()
|
||||
self.assertEqual(1, len(launchers))
|
||||
self.assertEqual(launcher.id, list(launchers)[0].id)
|
||||
|
||||
launcher.unregister()
|
||||
launchers = self.zk.getRegisteredPools()
|
||||
self.assertEqual(0, len(launchers))
|
||||
|
||||
|
||||
class TestZooKeeper(tests.DBTestCase):
|
||||
@ -564,23 +591,6 @@ class TestZooKeeper(tests.DBTestCase):
|
||||
self.zk.deleteUpload("trusty", "000", "rax", "000001")
|
||||
self.assertIsNone(self.zk.client.exists(path))
|
||||
|
||||
def test_registerLauncher(self):
|
||||
launcher = zk.Launcher()
|
||||
launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex
|
||||
self.zk.registerLauncher(launcher)
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
self.assertEqual(1, len(launchers))
|
||||
self.assertEqual(launcher.id, launchers[0].id)
|
||||
|
||||
def test_registerLauncher_safe_repeat(self):
|
||||
launcher = zk.Launcher()
|
||||
launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex
|
||||
self.zk.registerLauncher(launcher)
|
||||
self.zk.registerLauncher(launcher)
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
self.assertEqual(1, len(launchers))
|
||||
self.assertEqual(launcher.id, launchers[0].id)
|
||||
|
||||
def test_getNodeRequests_empty(self):
|
||||
self.assertEqual([], self.zk.getNodeRequests())
|
||||
|
||||
|
@ -30,3 +30,9 @@ try:
|
||||
git_version = _metadata['git_version']
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_version_string():
|
||||
if is_release:
|
||||
return release_string
|
||||
return "{} {}".format(release_string, git_version)
|
||||
|
413
nodepool/zk/components.py
Normal file
413
nodepool/zk/components.py
Normal file
@ -0,0 +1,413 @@
|
||||
# Copyright 2020 BMW Group
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
from kazoo.protocol.states import EventType
|
||||
|
||||
from nodepool.zk import ZooKeeperBase
|
||||
from nodepool.model_api import MODEL_API
|
||||
|
||||
|
||||
COMPONENTS_ROOT = "/nodepool/components"
|
||||
|
||||
|
||||
class GlobalRegistry:
|
||||
def __init__(self):
|
||||
self.registry = None
|
||||
|
||||
def create(self, zk_client):
|
||||
if not self.registry:
|
||||
self.registry = ComponentRegistry(zk_client)
|
||||
return self.registry
|
||||
|
||||
def clearRegistry(self):
|
||||
self.registry = None
|
||||
|
||||
@property
|
||||
def model_api(self):
|
||||
return self.registry.model_api
|
||||
|
||||
|
||||
COMPONENT_REGISTRY = GlobalRegistry()
|
||||
|
||||
|
||||
class BaseComponent(ZooKeeperBase):
|
||||
"""
|
||||
Read/write component object.
|
||||
|
||||
This object holds an offline cache of all the component's attributes. In
|
||||
case of an failed update to ZooKeeper the local cache will still hold the
|
||||
fresh values. Updating any attribute uploads all attributes to ZooKeeper.
|
||||
|
||||
This enables this object to be used as local key/value store even if the
|
||||
ZooKeeper connection got lost.
|
||||
|
||||
:arg client ZooKeeperClient: The ZooKeeperClient object to use, or
|
||||
None if this should be a read-only component.
|
||||
:arg hostname str: The component's hostname (multiple components with
|
||||
the same hostname may be registered; the registry will create unique
|
||||
nodes for each).
|
||||
"""
|
||||
|
||||
# Component states
|
||||
INITIALIZING = "initializing"
|
||||
RUNNING = "running"
|
||||
PAUSED = "paused"
|
||||
STOPPED = "stopped"
|
||||
|
||||
log = logging.getLogger("nodepool.Component")
|
||||
kind = "base"
|
||||
|
||||
def __init__(self, client, hostname, version=None):
|
||||
# Ensure that the content is available before setting any other
|
||||
# attribute, because our __setattr__ implementation is relying on it.
|
||||
self.__dict__["content"] = {
|
||||
"hostname": hostname,
|
||||
"state": self.STOPPED,
|
||||
"kind": self.kind,
|
||||
"version": version,
|
||||
"model_api": 0,
|
||||
}
|
||||
super().__init__(client)
|
||||
|
||||
self.path = None
|
||||
self._zstat = None
|
||||
self.register_lock = threading.Lock()
|
||||
|
||||
def __getattr__(self, name):
|
||||
try:
|
||||
return self.content[name]
|
||||
except KeyError:
|
||||
raise AttributeError
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
# If the specified attribute is not part of our content dictionary,
|
||||
# fall back to the default __settattr__ behaviour.
|
||||
if name not in self.content.keys():
|
||||
return super().__setattr__(name, value)
|
||||
|
||||
# Set the value in the local content dict
|
||||
self.content[name] = value
|
||||
|
||||
with self.register_lock:
|
||||
if not self.path:
|
||||
self.log.error(
|
||||
"Path is not set on this component, did you forget "
|
||||
"to call register()?"
|
||||
)
|
||||
return
|
||||
|
||||
# Update the ZooKeeper node
|
||||
content = json.dumps(self.content, sort_keys=True).encode("utf-8")
|
||||
try:
|
||||
zstat = self.kazoo_client.set(
|
||||
self.path, content, version=self._zstat.version
|
||||
)
|
||||
self._zstat = zstat
|
||||
except NoNodeError:
|
||||
self.log.error("Could not update %s in ZooKeeper", self)
|
||||
|
||||
def register(self, model_api=MODEL_API):
|
||||
self.content['model_api'] = model_api
|
||||
with self.register_lock:
|
||||
path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname])
|
||||
self.log.info("Registering component in ZooKeeper %s", path)
|
||||
self.path, self._zstat = self.kazoo_client.create(
|
||||
path,
|
||||
json.dumps(self.content, sort_keys=True).encode("utf-8"),
|
||||
makepath=True,
|
||||
ephemeral=True,
|
||||
sequence=True,
|
||||
# Also return the zstat, which is necessary to successfully
|
||||
# update the component.
|
||||
include_data=True,
|
||||
)
|
||||
|
||||
if not COMPONENT_REGISTRY.registry:
|
||||
return
|
||||
|
||||
# Wait 5 seconds for the component to appear in our local
|
||||
# cache so that operations which rely on lists of available
|
||||
# labels, etc, behave more synchronously.
|
||||
for x in range(50):
|
||||
registered = set()
|
||||
for kind, components in COMPONENT_REGISTRY.registry.all():
|
||||
for component in components:
|
||||
registered.add(component.path)
|
||||
if self.path in registered:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self.log.info("Did not see component registration for %s", path)
|
||||
|
||||
def unregister(self):
|
||||
with self.register_lock:
|
||||
self.log.info("Unregistering component in ZooKeeper %s", self.path)
|
||||
self.client.on_connect_listeners.remove(self._onConnect)
|
||||
self.client.on_disconnect_listeners.remove(self._onDisconnect)
|
||||
self.client.on_reconnect_listeners.remove(self._onReconnect)
|
||||
self.kazoo_client.delete(self.path)
|
||||
# Break the object so we can't register again
|
||||
del self.register_lock
|
||||
|
||||
# Wait 5 seconds for the component to appear in our local
|
||||
# cache so that operations which rely on lists of available
|
||||
# labels, etc, behave more synchronously.
|
||||
for x in range(50):
|
||||
registered = set()
|
||||
for kind, components in COMPONENT_REGISTRY.registry.all():
|
||||
for component in components:
|
||||
registered.add(component.path)
|
||||
if self.path not in registered:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self.log.info("Did not see component unregistration for %s", self.path)
|
||||
|
||||
def _onReconnect(self):
|
||||
self.register()
|
||||
|
||||
def updateFromDict(self, data):
|
||||
self.content.update(data)
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, client, hostname, data):
|
||||
component = cls(client, hostname)
|
||||
component.updateFromDict(data)
|
||||
return component
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self.__class__.__name__} {self.content}>"
|
||||
|
||||
|
||||
class LauncherComponent(BaseComponent):
|
||||
kind = "launcher"
|
||||
|
||||
|
||||
class BuilderComponent(BaseComponent):
|
||||
kind = "builder"
|
||||
|
||||
|
||||
# Not a process, but rather a provider-pool within a launcher.
|
||||
class PoolComponent(BaseComponent):
|
||||
kind = "pool"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.initial_state = {
|
||||
"id": None,
|
||||
"provider_name": None,
|
||||
"supported_labels": [],
|
||||
}
|
||||
self.content.update(self.initial_state)
|
||||
|
||||
|
||||
class ComponentRegistry(ZooKeeperBase):
|
||||
"""A component registry is organized like:
|
||||
|
||||
/nodepool/components/{kind}/{sequence}
|
||||
|
||||
Where kind corresponds to one of the classes in this module, and
|
||||
sequence is a ZK sequence node with a prefix of the hostname in
|
||||
order to produce a unique id for multiple identical components
|
||||
running on the same host. An example path:
|
||||
|
||||
/nodepool/components/scheduler/hostname0000000000
|
||||
|
||||
Components are ephemeral nodes, and so if the actual service
|
||||
disconnects from ZK, the node will disappear.
|
||||
|
||||
Component objects returned by this class are read-only; updating
|
||||
their attributes will not be reflected in ZooKeeper.
|
||||
"""
|
||||
log = logging.getLogger("nodepool.ComponentRegistry")
|
||||
|
||||
COMPONENT_CLASSES = {
|
||||
"pool": PoolComponent,
|
||||
"launcher": LauncherComponent,
|
||||
"builder": BuilderComponent,
|
||||
}
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__(client)
|
||||
|
||||
self.client = client
|
||||
self._component_tree = None
|
||||
# kind -> hostname -> component
|
||||
self._cached_components = defaultdict(dict)
|
||||
|
||||
self.model_api = None
|
||||
# Have we initialized enough to trust the model_api
|
||||
self._init = False
|
||||
# If we are already connected when the class is instantiated, directly
|
||||
# call the onConnect callback.
|
||||
if self.client.connected:
|
||||
self._onConnect()
|
||||
|
||||
def _getComponentRoot(self, kind):
|
||||
return '/'.join([COMPONENTS_ROOT, kind])
|
||||
|
||||
def _getComponentPath(self, kind, hostname):
|
||||
return '/'.join([COMPONENTS_ROOT, kind, hostname])
|
||||
|
||||
def _onConnect(self):
|
||||
for kind in self.COMPONENT_CLASSES.keys():
|
||||
root = self._getComponentRoot(kind)
|
||||
self.kazoo_client.ensure_path(root)
|
||||
self.kazoo_client.ChildrenWatch(
|
||||
root, self._makeComponentRootWatcher(kind))
|
||||
self._init = True
|
||||
self._updateMinimumModelApi()
|
||||
|
||||
def _makeComponentRootWatcher(self, kind):
|
||||
def watch(children, event=None):
|
||||
return self._onComponentRootUpdate(kind, children, event)
|
||||
return watch
|
||||
|
||||
def _onComponentRootUpdate(self, kind, children, event):
|
||||
for hostname in children:
|
||||
component = self._cached_components[kind].get(hostname)
|
||||
if not component:
|
||||
self.log.info("Noticed new %s component %s", kind, hostname)
|
||||
root = self._getComponentPath(kind, hostname)
|
||||
self.kazoo_client.DataWatch(
|
||||
root, self._makeComponentWatcher(kind, hostname))
|
||||
|
||||
def _makeComponentWatcher(self, kind, hostname):
|
||||
def watch(data, stat, event=None):
|
||||
return self._onComponentUpdate(kind, hostname, data, stat, event)
|
||||
return watch
|
||||
|
||||
def _onComponentUpdate(self, kind, hostname, data, stat, event):
|
||||
if event:
|
||||
etype = event.type
|
||||
else:
|
||||
etype = None
|
||||
self.log.debug(
|
||||
"Registry %s got event %s for %s %s",
|
||||
self, etype, kind, hostname)
|
||||
if (etype in (None, EventType.CHANGED, EventType.CREATED) and
|
||||
data is not None):
|
||||
|
||||
# Perform an in-place update of the cached component (if any)
|
||||
component = self._cached_components.get(kind, {}).get(hostname)
|
||||
d = json.loads(data.decode("utf-8"))
|
||||
|
||||
self.log.info(
|
||||
"Component %s %s updated: %s",
|
||||
kind, hostname, d)
|
||||
|
||||
if component:
|
||||
if (stat.version <= component._zstat.version):
|
||||
# Don't update to older data
|
||||
return
|
||||
component.updateFromDict(d)
|
||||
component._zstat = stat
|
||||
else:
|
||||
# Create a new component from the data structure
|
||||
# Get the correct kind of component
|
||||
# TODO (felix): KeyError on unknown component type?
|
||||
component_cls = self.COMPONENT_CLASSES[kind]
|
||||
# Pass in null ZK client to make a read-only component
|
||||
# instance.
|
||||
component = component_cls.fromDict(None, hostname, d)
|
||||
component.path = self._getComponentPath(kind, hostname)
|
||||
component._zstat = stat
|
||||
|
||||
self._cached_components[kind][hostname] = component
|
||||
self._updateMinimumModelApi()
|
||||
elif (etype == EventType.DELETED or data is None):
|
||||
self.log.info(
|
||||
"Noticed %s component %s disappeared",
|
||||
kind, hostname)
|
||||
try:
|
||||
del self._cached_components[kind][hostname]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
self._updateMinimumModelApi()
|
||||
# Return False to stop the datawatch
|
||||
return False
|
||||
|
||||
def all(self, kind=None):
|
||||
"""Returns a list of components.
|
||||
|
||||
If kind is None, then a list of tuples is returned, with each tuple
|
||||
being (kind, [list of components]).
|
||||
|
||||
:arg kind str: The type of component to look up in the registry, or
|
||||
None to return all kinds
|
||||
"""
|
||||
|
||||
if kind is None:
|
||||
return [(kind, list(components.values()))
|
||||
for (kind, components) in self._cached_components.items()]
|
||||
|
||||
# Filter the cached components for the given kind
|
||||
return self._cached_components.get(kind, {}).values()
|
||||
|
||||
def getMinimumModelApi(self):
|
||||
"""Get the minimum model API version of all currently connected
|
||||
components"""
|
||||
|
||||
# Start with our own version in case we're the only component
|
||||
# and we haven't registered.
|
||||
version = MODEL_API
|
||||
for kind, components in self.all():
|
||||
for component in components:
|
||||
version = min(version, component.model_api)
|
||||
return version
|
||||
|
||||
def _updateMinimumModelApi(self):
|
||||
if not self._init:
|
||||
return
|
||||
version = self.getMinimumModelApi()
|
||||
if version != self.model_api:
|
||||
self.log.info(f"System minimum data model version {version}; "
|
||||
f"this component {MODEL_API}")
|
||||
if self.model_api is None:
|
||||
if version < MODEL_API:
|
||||
self.log.info("The data model version of this component is "
|
||||
"newer than the rest of the system; this "
|
||||
"component will operate in compatability mode "
|
||||
"until the system is upgraded")
|
||||
elif version > MODEL_API:
|
||||
self.log.error("The data model version of this component is "
|
||||
"older than the rest of the system; "
|
||||
"exiting to prevent data corruption")
|
||||
sys.exit(1)
|
||||
else:
|
||||
if version > self.model_api:
|
||||
if version > MODEL_API:
|
||||
self.log.info("The data model version of this component "
|
||||
"is older than other components in the "
|
||||
"system, so other components will operate "
|
||||
"in a compability mode; upgrade this "
|
||||
"component as soon as possible to complete "
|
||||
"the system upgrade")
|
||||
elif version == MODEL_API:
|
||||
self.log.info("The rest of the system has been upgraded "
|
||||
"to the data model version of this "
|
||||
"component")
|
||||
elif version < self.model_api:
|
||||
self.log.error("A component with a data model version older "
|
||||
"than the rest of the system has been started; "
|
||||
"data corruption is very likely to occur.")
|
||||
# Should we exit here as well?
|
||||
self.model_api = version
|
@ -26,6 +26,7 @@ from kazoo.recipe.election import Election
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
from nodepool.logconfig import get_annotated_logger
|
||||
from nodepool.zk.components import COMPONENT_REGISTRY
|
||||
|
||||
# States:
|
||||
# We are building this image (or node) but it is not ready for use.
|
||||
@ -113,56 +114,6 @@ class Serializable(abc.ABC):
|
||||
return json.dumps(self.toDict()).encode('utf8')
|
||||
|
||||
|
||||
class Launcher(Serializable):
|
||||
'''
|
||||
Class to describe a nodepool launcher.
|
||||
'''
|
||||
|
||||
def __init__(self):
|
||||
self.id = None
|
||||
self.provider_name = None
|
||||
self._supported_labels = set()
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Launcher):
|
||||
return (self.id == other.id and
|
||||
self.supported_labels == other.supported_labels)
|
||||
else:
|
||||
return False
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
@property
|
||||
def supported_labels(self):
|
||||
return self._supported_labels
|
||||
|
||||
@supported_labels.setter
|
||||
def supported_labels(self, value):
|
||||
if not isinstance(value, set):
|
||||
raise TypeError("'supported_labels' attribute must be a set")
|
||||
self._supported_labels = value
|
||||
|
||||
def toDict(self):
|
||||
d = {}
|
||||
d['id'] = self.id
|
||||
d['provider_name'] = self.provider_name
|
||||
# sets are not JSON serializable, so use a sorted list
|
||||
d['supported_labels'] = sorted(self.supported_labels)
|
||||
return d
|
||||
|
||||
@staticmethod
|
||||
def fromDict(d):
|
||||
obj = Launcher()
|
||||
obj.id = d.get('id')
|
||||
# TODO(tobiash): The fallback to 'unknown' is only needed to avoid
|
||||
# having a full nodepool shutdown on upgrade. It can be
|
||||
# removed later.
|
||||
obj.provider_name = d.get('provider_name', 'unknown')
|
||||
obj.supported_labels = set(d.get('supported_labels', []))
|
||||
return obj
|
||||
|
||||
|
||||
class BaseModel(Serializable):
|
||||
VALID_STATES = set([])
|
||||
|
||||
@ -764,6 +715,8 @@ class ZooKeeper(object):
|
||||
self._request_cache.listen(self.requestCacheListener)
|
||||
self._request_cache.start()
|
||||
|
||||
COMPONENT_REGISTRY.create(self.zk_client)
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
# =======================================================================
|
||||
@ -1588,65 +1541,14 @@ class ZooKeeper(object):
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def registerLauncher(self, launcher):
|
||||
def getRegisteredPools(self):
|
||||
'''
|
||||
Register an active node launcher.
|
||||
Get a list of all launcher pools that have registered with ZooKeeper.
|
||||
|
||||
The launcher is de-registered when the launcher process terminates or
|
||||
otherwise disconnects from ZooKeeper, or via deregisterLauncher().
|
||||
It will need to re-register after a lost connection. This method is
|
||||
safe to call multiple times.
|
||||
|
||||
:param Launcher launcher: Object describing the launcher.
|
||||
:returns: A list of PoolComponent objects, or empty list if none
|
||||
are found.
|
||||
'''
|
||||
path = self._launcherPath(launcher.id)
|
||||
|
||||
if self.client.exists(path):
|
||||
data, _ = self.client.get(path)
|
||||
obj = Launcher.fromDict(self._bytesToDict(data))
|
||||
if obj != launcher:
|
||||
self.client.set(path, launcher.serialize())
|
||||
self.log.debug("Updated registration for launcher %s",
|
||||
launcher.id)
|
||||
else:
|
||||
self.client.create(path, value=launcher.serialize(),
|
||||
makepath=True, ephemeral=True)
|
||||
self.log.debug("Registered launcher %s", launcher.id)
|
||||
|
||||
def deregisterLauncher(self, launcher_id):
|
||||
'''
|
||||
Deregister an active node launcher.
|
||||
|
||||
:param str launcher_id: ID of the launcher to deregister.
|
||||
'''
|
||||
path = self._launcherPath(launcher_id)
|
||||
try:
|
||||
self.client.delete(path, recursive=True)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def getRegisteredLaunchers(self):
|
||||
'''
|
||||
Get a list of all launchers that have registered with ZooKeeper.
|
||||
|
||||
:returns: A list of Launcher objects, or empty list if none are found.
|
||||
'''
|
||||
try:
|
||||
launcher_ids = self.client.get_children(self.LAUNCHER_ROOT)
|
||||
except kze.NoNodeError:
|
||||
return []
|
||||
|
||||
objs = []
|
||||
for launcher in launcher_ids:
|
||||
path = self._launcherPath(launcher)
|
||||
try:
|
||||
data, _ = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
# launcher disappeared
|
||||
continue
|
||||
|
||||
objs.append(Launcher.fromDict(self._bytesToDict(data)))
|
||||
return objs
|
||||
return list(COMPONENT_REGISTRY.registry.all(kind='pool'))
|
||||
|
||||
def getNodeRequests(self):
|
||||
'''
|
||||
|
11
releasenotes/notes/component-registry-327e1ade02155e39.yaml
Normal file
11
releasenotes/notes/component-registry-327e1ade02155e39.yaml
Normal file
@ -0,0 +1,11 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
Due to an internal change in how Nodepool launchers communicate
|
||||
with each other, all launchers should be upgraded to the same
|
||||
version within a short period of time.
|
||||
|
||||
They will generally continue to work at different versions, but
|
||||
the mechanism that allows them to yield to specific providers when
|
||||
requested is being changed and so that will not function correctly
|
||||
unless they are upgraded near-simultaneously.
|
Loading…
x
Reference in New Issue
Block a user