From 775b8336176bff6a800f7fcf1bea42e06d3117eb Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Mon, 27 Jul 2020 14:40:21 -0700 Subject: [PATCH] Add basic max-servers handling to the k8s driver This maps zk "nodes" to k8s resources which are currently pods or namespaces in order to count them and apply max-servers limitations. Since a pod is always in a namespace we've essentially mapped "nodes" to k8s namespaces. This limitation is quite basic and jobs with a namespace can still in theory consume a number of resources, but if you control the jobs and then limit the number of concurrent jobs via max-servers aka max namespaces you should be able to have some control. To make this work we adapt the simple driver's handler class for use by the kubernetes driver. In particular we provide a method to override the launcher class. Otherwise the code necessary to handle quota checks is identitical so we reuse it. Change-Id: Ieb9f179d99b322a9cbf4bc19d4f495bcda0d1ea8 --- nodepool/driver/kubernetes/handler.py | 49 ++----------------- nodepool/driver/kubernetes/provider.py | 23 ++++++++- nodepool/driver/simple.py | 10 ++-- .../fixtures/functional/kubernetes/basic.yaml | 1 + nodepool/tests/fixtures/kubernetes.yaml | 1 + nodepool/tests/unit/test_driver_kubernetes.py | 43 ++++++++++++++++ 6 files changed, 76 insertions(+), 51 deletions(-) diff --git a/nodepool/driver/kubernetes/handler.py b/nodepool/driver/kubernetes/handler.py index 17c1251ea..02474f309 100644 --- a/nodepool/driver/kubernetes/handler.py +++ b/nodepool/driver/kubernetes/handler.py @@ -17,8 +17,8 @@ import logging from kazoo import exceptions as kze from nodepool import zk +from nodepool.driver.simple import SimpleTaskManagerHandler from nodepool.driver.utils import NodeLauncher -from nodepool.driver import NodeRequestHandler class K8SLauncher(NodeLauncher): @@ -76,50 +76,7 @@ class K8SLauncher(NodeLauncher): attempts += 1 -class KubernetesNodeRequestHandler(NodeRequestHandler): +class KubernetesNodeRequestHandler(SimpleTaskManagerHandler): log = logging.getLogger("nodepool.driver.kubernetes." "KubernetesNodeRequestHandler") - - def __init__(self, pw, request): - super().__init__(pw, request) - self._threads = [] - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.isAlive(): - count += 1 - return count - - def imagesAvailable(self): - return True - - def launchesComplete(self): - ''' - Check if all launch requests have completed. - - When all of the Node objects have reached a final state (READY or - FAILED), we'll know all threads have finished the launch process. - ''' - if not self._threads: - return True - - # Give the NodeLaunch threads time to finish. - if self.alive_thread_count: - return False - - node_states = [node.state for node in self.nodeset] - - # NOTE: It very important that NodeLauncher always sets one of - # these states, no matter what. - if not all(s in (zk.READY, zk.FAILED) for s in node_states): - return False - - return True - - def launch(self, node): - label = self.pool.labels[node.type[0]] - thd = K8SLauncher(self, node, self.provider, label) - thd.start() - self._threads.append(thd) + launcher = K8SLauncher diff --git a/nodepool/driver/kubernetes/provider.py b/nodepool/driver/kubernetes/provider.py index f751a0bb4..f56512707 100644 --- a/nodepool/driver/kubernetes/provider.py +++ b/nodepool/driver/kubernetes/provider.py @@ -14,6 +14,7 @@ import base64 import logging +import math import urllib3 import time @@ -24,15 +25,18 @@ from openshift import config from nodepool import exceptions from nodepool.driver import Provider from nodepool.driver.kubernetes import handler +from nodepool.driver.utils import QuotaInformation, QuotaSupport urllib3.disable_warnings() -class KubernetesProvider(Provider): +class KubernetesProvider(Provider, QuotaSupport): log = logging.getLogger("nodepool.driver.kubernetes.KubernetesProvider") def __init__(self, provider, *args): + super().__init__() self.provider = provider + self._zk = None self.ready = False try: self.k8s_client, self.rbac_client = self._get_client( @@ -63,6 +67,7 @@ class KubernetesProvider(Provider): def start(self, zk_conn): self.log.debug("Starting") + self._zk = zk_conn if self.ready or not self.k8s_client or not self.rbac_client: return self.ready = True @@ -318,3 +323,19 @@ class KubernetesProvider(Provider): def getRequestHandler(self, poolworker, request): return handler.KubernetesNodeRequestHandler(poolworker, request) + + def getProviderLimits(self): + # TODO: query the api to get real limits + return QuotaInformation( + cores=math.inf, + instances=math.inf, + ram=math.inf, + default=math.inf) + + def quotaNeededByLabel(self, ntype, pool): + # TODO: return real quota information about a label + return QuotaInformation(cores=1, instances=1, ram=1, default=1) + + def unmanagedQuotaUsed(self): + # TODO: return real quota information about quota + return QuotaInformation() diff --git a/nodepool/driver/simple.py b/nodepool/driver/simple.py index 63b5303f5..6429a761b 100644 --- a/nodepool/driver/simple.py +++ b/nodepool/driver/simple.py @@ -145,6 +145,7 @@ class SimpleTaskManagerLauncher(NodeLauncher): class SimpleTaskManagerHandler(NodeRequestHandler): log = logging.getLogger("nodepool.driver.simple." "SimpleTaskManagerHandler") + launcher = SimpleTaskManagerLauncher def __init__(self, pw, request): super().__init__(pw, request) @@ -230,8 +231,8 @@ class SimpleTaskManagerHandler(NodeRequestHandler): ''' Check if all launch requests have completed. - When all of the Node objects have reached a final state (READY or - FAILED), we'll know all threads have finished the launch process. + When all of the Node objects have reached a final state (READY, FAILED + or ABORTED), we'll know all threads have finished the launch process. ''' if not self._threads: return True @@ -244,14 +245,15 @@ class SimpleTaskManagerHandler(NodeRequestHandler): # NOTE: It is very important that NodeLauncher always sets one # of these states, no matter what. - if not all(s in (zk.READY, zk.FAILED) for s in node_states): + if not all(s in (zk.READY, zk.FAILED, zk.ABORTED) + for s in node_states): return False return True def launch(self, node): label = self.pool.labels[node.type[0]] - thd = SimpleTaskManagerLauncher(self, node, self.provider, label) + thd = self.launcher(self, node, self.provider, label) thd.start() self._threads.append(thd) diff --git a/nodepool/tests/fixtures/functional/kubernetes/basic.yaml b/nodepool/tests/fixtures/functional/kubernetes/basic.yaml index d830c177d..4013cfffe 100644 --- a/nodepool/tests/fixtures/functional/kubernetes/basic.yaml +++ b/nodepool/tests/fixtures/functional/kubernetes/basic.yaml @@ -15,6 +15,7 @@ providers: context: minikube pools: - name: main + max-servers: 2 labels: - name: kubernetes-namespace type: namespace diff --git a/nodepool/tests/fixtures/kubernetes.yaml b/nodepool/tests/fixtures/kubernetes.yaml index 27818c4a6..579a7be6b 100644 --- a/nodepool/tests/fixtures/kubernetes.yaml +++ b/nodepool/tests/fixtures/kubernetes.yaml @@ -13,6 +13,7 @@ providers: context: admin-cluster.local pools: - name: main + max-servers: 2 node-attributes: key1: value1 key2: value2 diff --git a/nodepool/tests/unit/test_driver_kubernetes.py b/nodepool/tests/unit/test_driver_kubernetes.py index 283477177..edfd34852 100644 --- a/nodepool/tests/unit/test_driver_kubernetes.py +++ b/nodepool/tests/unit/test_driver_kubernetes.py @@ -15,6 +15,7 @@ import fixtures import logging +import time from nodepool import tests from nodepool import zk @@ -155,3 +156,45 @@ class TestDriverKubernetes(tests.DBTestCase): self.zk.storeNode(node) self.waitForNodeDeletion(node) + + def test_kubernetes_max_servers(self): + configfile = self.setup_config('kubernetes.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + # Start two pods to hit max-server limit + reqs = [] + for x in [1, 2]: + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('pod-fedora') + self.zk.storeNodeRequest(req) + reqs.append(req) + + fulfilled_reqs = [] + for req in reqs: + self.log.debug("Waiting for request %s", req.id) + r = self.waitForNodeRequest(req) + self.assertEqual(r.state, zk.FULFILLED) + fulfilled_reqs.append(r) + + # Now request a third pod that will hit the limit + max_req = zk.NodeRequest() + max_req.state = zk.REQUESTED + max_req.node_types.append('pod-fedora') + self.zk.storeNodeRequest(max_req) + + # The previous request should pause the handler + pool_worker = pool.getPoolWorkers('kubespray') + while not pool_worker[0].paused_handler: + time.sleep(0.1) + + # Delete the earlier two pods freeing space for the third. + for req in fulfilled_reqs: + node = self.zk.getNode(req.nodes[0]) + node.state = zk.DELETING + self.zk.storeNode(node) + self.waitForNodeDeletion(node) + + # We should unpause and fulfill this now + req = self.waitForNodeRequest(max_req) + self.assertEqual(req.state, zk.FULFILLED)