diff --git a/nodepool/driver/kubernetes/handler.py b/nodepool/driver/kubernetes/handler.py index 17c1251ea..02474f309 100644 --- a/nodepool/driver/kubernetes/handler.py +++ b/nodepool/driver/kubernetes/handler.py @@ -17,8 +17,8 @@ import logging from kazoo import exceptions as kze from nodepool import zk +from nodepool.driver.simple import SimpleTaskManagerHandler from nodepool.driver.utils import NodeLauncher -from nodepool.driver import NodeRequestHandler class K8SLauncher(NodeLauncher): @@ -76,50 +76,7 @@ class K8SLauncher(NodeLauncher): attempts += 1 -class KubernetesNodeRequestHandler(NodeRequestHandler): +class KubernetesNodeRequestHandler(SimpleTaskManagerHandler): log = logging.getLogger("nodepool.driver.kubernetes." "KubernetesNodeRequestHandler") - - def __init__(self, pw, request): - super().__init__(pw, request) - self._threads = [] - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.isAlive(): - count += 1 - return count - - def imagesAvailable(self): - return True - - def launchesComplete(self): - ''' - Check if all launch requests have completed. - - When all of the Node objects have reached a final state (READY or - FAILED), we'll know all threads have finished the launch process. - ''' - if not self._threads: - return True - - # Give the NodeLaunch threads time to finish. - if self.alive_thread_count: - return False - - node_states = [node.state for node in self.nodeset] - - # NOTE: It very important that NodeLauncher always sets one of - # these states, no matter what. - if not all(s in (zk.READY, zk.FAILED) for s in node_states): - return False - - return True - - def launch(self, node): - label = self.pool.labels[node.type[0]] - thd = K8SLauncher(self, node, self.provider, label) - thd.start() - self._threads.append(thd) + launcher = K8SLauncher diff --git a/nodepool/driver/kubernetes/provider.py b/nodepool/driver/kubernetes/provider.py index b4d4c9f57..3395ae89f 100644 --- a/nodepool/driver/kubernetes/provider.py +++ b/nodepool/driver/kubernetes/provider.py @@ -14,6 +14,7 @@ import base64 import logging +import math import urllib3 import time @@ -24,15 +25,18 @@ from openshift import config from nodepool import exceptions from nodepool.driver import Provider from nodepool.driver.kubernetes import handler +from nodepool.driver.utils import QuotaInformation, QuotaSupport urllib3.disable_warnings() -class KubernetesProvider(Provider): +class KubernetesProvider(Provider, QuotaSupport): log = logging.getLogger("nodepool.driver.kubernetes.KubernetesProvider") def __init__(self, provider, *args): + super().__init__() self.provider = provider + self._zk = None self.ready = False try: self.k8s_client, self.rbac_client = self._get_client( @@ -63,6 +67,7 @@ class KubernetesProvider(Provider): def start(self, zk_conn): self.log.debug("Starting") + self._zk = zk_conn if self.ready or not self.k8s_client or not self.rbac_client: return self.ready = True @@ -319,3 +324,19 @@ class KubernetesProvider(Provider): def getRequestHandler(self, poolworker, request): return handler.KubernetesNodeRequestHandler(poolworker, request) + + def getProviderLimits(self): + # TODO: query the api to get real limits + return QuotaInformation( + cores=math.inf, + instances=math.inf, + ram=math.inf, + default=math.inf) + + def quotaNeededByLabel(self, ntype, pool): + # TODO: return real quota information about a label + return QuotaInformation(cores=1, instances=1, ram=1, default=1) + + def unmanagedQuotaUsed(self): + # TODO: return real quota information about quota + return QuotaInformation() diff --git a/nodepool/driver/simple.py b/nodepool/driver/simple.py index 63b5303f5..6429a761b 100644 --- a/nodepool/driver/simple.py +++ b/nodepool/driver/simple.py @@ -145,6 +145,7 @@ class SimpleTaskManagerLauncher(NodeLauncher): class SimpleTaskManagerHandler(NodeRequestHandler): log = logging.getLogger("nodepool.driver.simple." "SimpleTaskManagerHandler") + launcher = SimpleTaskManagerLauncher def __init__(self, pw, request): super().__init__(pw, request) @@ -230,8 +231,8 @@ class SimpleTaskManagerHandler(NodeRequestHandler): ''' Check if all launch requests have completed. - When all of the Node objects have reached a final state (READY or - FAILED), we'll know all threads have finished the launch process. + When all of the Node objects have reached a final state (READY, FAILED + or ABORTED), we'll know all threads have finished the launch process. ''' if not self._threads: return True @@ -244,14 +245,15 @@ class SimpleTaskManagerHandler(NodeRequestHandler): # NOTE: It is very important that NodeLauncher always sets one # of these states, no matter what. - if not all(s in (zk.READY, zk.FAILED) for s in node_states): + if not all(s in (zk.READY, zk.FAILED, zk.ABORTED) + for s in node_states): return False return True def launch(self, node): label = self.pool.labels[node.type[0]] - thd = SimpleTaskManagerLauncher(self, node, self.provider, label) + thd = self.launcher(self, node, self.provider, label) thd.start() self._threads.append(thd) diff --git a/nodepool/tests/fixtures/functional/kubernetes/basic.yaml b/nodepool/tests/fixtures/functional/kubernetes/basic.yaml index d830c177d..4013cfffe 100644 --- a/nodepool/tests/fixtures/functional/kubernetes/basic.yaml +++ b/nodepool/tests/fixtures/functional/kubernetes/basic.yaml @@ -15,6 +15,7 @@ providers: context: minikube pools: - name: main + max-servers: 2 labels: - name: kubernetes-namespace type: namespace diff --git a/nodepool/tests/fixtures/kubernetes.yaml b/nodepool/tests/fixtures/kubernetes.yaml index 27818c4a6..579a7be6b 100644 --- a/nodepool/tests/fixtures/kubernetes.yaml +++ b/nodepool/tests/fixtures/kubernetes.yaml @@ -13,6 +13,7 @@ providers: context: admin-cluster.local pools: - name: main + max-servers: 2 node-attributes: key1: value1 key2: value2 diff --git a/nodepool/tests/unit/test_driver_kubernetes.py b/nodepool/tests/unit/test_driver_kubernetes.py index 283477177..edfd34852 100644 --- a/nodepool/tests/unit/test_driver_kubernetes.py +++ b/nodepool/tests/unit/test_driver_kubernetes.py @@ -15,6 +15,7 @@ import fixtures import logging +import time from nodepool import tests from nodepool import zk @@ -155,3 +156,45 @@ class TestDriverKubernetes(tests.DBTestCase): self.zk.storeNode(node) self.waitForNodeDeletion(node) + + def test_kubernetes_max_servers(self): + configfile = self.setup_config('kubernetes.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + # Start two pods to hit max-server limit + reqs = [] + for x in [1, 2]: + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('pod-fedora') + self.zk.storeNodeRequest(req) + reqs.append(req) + + fulfilled_reqs = [] + for req in reqs: + self.log.debug("Waiting for request %s", req.id) + r = self.waitForNodeRequest(req) + self.assertEqual(r.state, zk.FULFILLED) + fulfilled_reqs.append(r) + + # Now request a third pod that will hit the limit + max_req = zk.NodeRequest() + max_req.state = zk.REQUESTED + max_req.node_types.append('pod-fedora') + self.zk.storeNodeRequest(max_req) + + # The previous request should pause the handler + pool_worker = pool.getPoolWorkers('kubespray') + while not pool_worker[0].paused_handler: + time.sleep(0.1) + + # Delete the earlier two pods freeing space for the third. + for req in fulfilled_reqs: + node = self.zk.getNode(req.nodes[0]) + node.state = zk.DELETING + self.zk.storeNode(node) + self.waitForNodeDeletion(node) + + # We should unpause and fulfill this now + req = self.waitForNodeRequest(max_req) + self.assertEqual(req.state, zk.FULFILLED)