Add basic max-servers handling to the k8s driver

This maps zk "nodes" to k8s resources which are currently pods or
namespaces in order to count them and apply max-servers limitations.
Since a pod is always in a namespace we've essentially mapped "nodes"
to k8s namespaces. This limitation is quite basic and jobs with a
namespace can still in theory consume a number of resources, but if you
control the jobs and then limit the number of concurrent jobs via
max-servers aka max namespaces you should be able to have some control.

To make this work we adapt the simple driver's handler class for use by
the kubernetes driver. In particular we provide a method to override the
launcher class. Otherwise the code necessary to handle quota checks is
identitical so we reuse it.

Change-Id: Ieb9f179d99b322a9cbf4bc19d4f495bcda0d1ea8
This commit is contained in:
Clark Boylan 2020-07-27 14:40:21 -07:00
parent 898bfe1c6f
commit 775b833617
6 changed files with 76 additions and 51 deletions

View File

@ -17,8 +17,8 @@ import logging
from kazoo import exceptions as kze
from nodepool import zk
from nodepool.driver.simple import SimpleTaskManagerHandler
from nodepool.driver.utils import NodeLauncher
from nodepool.driver import NodeRequestHandler
class K8SLauncher(NodeLauncher):
@ -76,50 +76,7 @@ class K8SLauncher(NodeLauncher):
attempts += 1
class KubernetesNodeRequestHandler(NodeRequestHandler):
class KubernetesNodeRequestHandler(SimpleTaskManagerHandler):
log = logging.getLogger("nodepool.driver.kubernetes."
"KubernetesNodeRequestHandler")
def __init__(self, pw, request):
super().__init__(pw, request)
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.isAlive():
count += 1
return count
def imagesAvailable(self):
return True
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = K8SLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)
launcher = K8SLauncher

View File

@ -14,6 +14,7 @@
import base64
import logging
import math
import urllib3
import time
@ -24,15 +25,18 @@ from openshift import config
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.kubernetes import handler
from nodepool.driver.utils import QuotaInformation, QuotaSupport
urllib3.disable_warnings()
class KubernetesProvider(Provider):
class KubernetesProvider(Provider, QuotaSupport):
log = logging.getLogger("nodepool.driver.kubernetes.KubernetesProvider")
def __init__(self, provider, *args):
super().__init__()
self.provider = provider
self._zk = None
self.ready = False
try:
self.k8s_client, self.rbac_client = self._get_client(
@ -63,6 +67,7 @@ class KubernetesProvider(Provider):
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.k8s_client or not self.rbac_client:
return
self.ready = True
@ -318,3 +323,19 @@ class KubernetesProvider(Provider):
def getRequestHandler(self, poolworker, request):
return handler.KubernetesNodeRequestHandler(poolworker, request)
def getProviderLimits(self):
# TODO: query the api to get real limits
return QuotaInformation(
cores=math.inf,
instances=math.inf,
ram=math.inf,
default=math.inf)
def quotaNeededByLabel(self, ntype, pool):
# TODO: return real quota information about a label
return QuotaInformation(cores=1, instances=1, ram=1, default=1)
def unmanagedQuotaUsed(self):
# TODO: return real quota information about quota
return QuotaInformation()

View File

@ -145,6 +145,7 @@ class SimpleTaskManagerLauncher(NodeLauncher):
class SimpleTaskManagerHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.simple."
"SimpleTaskManagerHandler")
launcher = SimpleTaskManagerLauncher
def __init__(self, pw, request):
super().__init__(pw, request)
@ -230,8 +231,8 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
When all of the Node objects have reached a final state (READY, FAILED
or ABORTED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
@ -244,14 +245,15 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
# NOTE: It is very important that NodeLauncher always sets one
# of these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = SimpleTaskManagerLauncher(self, node, self.provider, label)
thd = self.launcher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

View File

@ -15,6 +15,7 @@ providers:
context: minikube
pools:
- name: main
max-servers: 2
labels:
- name: kubernetes-namespace
type: namespace

View File

@ -13,6 +13,7 @@ providers:
context: admin-cluster.local
pools:
- name: main
max-servers: 2
node-attributes:
key1: value1
key2: value2

View File

@ -15,6 +15,7 @@
import fixtures
import logging
import time
from nodepool import tests
from nodepool import zk
@ -155,3 +156,45 @@ class TestDriverKubernetes(tests.DBTestCase):
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
def test_kubernetes_max_servers(self):
configfile = self.setup_config('kubernetes.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
# Start two pods to hit max-server limit
reqs = []
for x in [1, 2]:
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('pod-fedora')
self.zk.storeNodeRequest(req)
reqs.append(req)
fulfilled_reqs = []
for req in reqs:
self.log.debug("Waiting for request %s", req.id)
r = self.waitForNodeRequest(req)
self.assertEqual(r.state, zk.FULFILLED)
fulfilled_reqs.append(r)
# Now request a third pod that will hit the limit
max_req = zk.NodeRequest()
max_req.state = zk.REQUESTED
max_req.node_types.append('pod-fedora')
self.zk.storeNodeRequest(max_req)
# The previous request should pause the handler
pool_worker = pool.getPoolWorkers('kubespray')
while not pool_worker[0].paused_handler:
time.sleep(0.1)
# Delete the earlier two pods freeing space for the third.
for req in fulfilled_reqs:
node = self.zk.getNode(req.nodes[0])
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
# We should unpause and fulfill this now
req = self.waitForNodeRequest(max_req)
self.assertEqual(req.state, zk.FULFILLED)