Merge "Add basic max-servers handling to the k8s driver"
This commit is contained in:
commit
6ebac5fd17
@ -17,8 +17,8 @@ import logging
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
from nodepool import zk
|
||||
from nodepool.driver.simple import SimpleTaskManagerHandler
|
||||
from nodepool.driver.utils import NodeLauncher
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
|
||||
|
||||
class K8SLauncher(NodeLauncher):
|
||||
@ -76,50 +76,7 @@ class K8SLauncher(NodeLauncher):
|
||||
attempts += 1
|
||||
|
||||
|
||||
class KubernetesNodeRequestHandler(NodeRequestHandler):
|
||||
class KubernetesNodeRequestHandler(SimpleTaskManagerHandler):
|
||||
log = logging.getLogger("nodepool.driver.kubernetes."
|
||||
"KubernetesNodeRequestHandler")
|
||||
|
||||
def __init__(self, pw, request):
|
||||
super().__init__(pw, request)
|
||||
self._threads = []
|
||||
|
||||
@property
|
||||
def alive_thread_count(self):
|
||||
count = 0
|
||||
for t in self._threads:
|
||||
if t.isAlive():
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def imagesAvailable(self):
|
||||
return True
|
||||
|
||||
def launchesComplete(self):
|
||||
'''
|
||||
Check if all launch requests have completed.
|
||||
|
||||
When all of the Node objects have reached a final state (READY or
|
||||
FAILED), we'll know all threads have finished the launch process.
|
||||
'''
|
||||
if not self._threads:
|
||||
return True
|
||||
|
||||
# Give the NodeLaunch threads time to finish.
|
||||
if self.alive_thread_count:
|
||||
return False
|
||||
|
||||
node_states = [node.state for node in self.nodeset]
|
||||
|
||||
# NOTE: It very important that NodeLauncher always sets one of
|
||||
# these states, no matter what.
|
||||
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def launch(self, node):
|
||||
label = self.pool.labels[node.type[0]]
|
||||
thd = K8SLauncher(self, node, self.provider, label)
|
||||
thd.start()
|
||||
self._threads.append(thd)
|
||||
launcher = K8SLauncher
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import math
|
||||
import urllib3
|
||||
import time
|
||||
|
||||
@ -24,15 +25,18 @@ from openshift import config
|
||||
from nodepool import exceptions
|
||||
from nodepool.driver import Provider
|
||||
from nodepool.driver.kubernetes import handler
|
||||
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
|
||||
class KubernetesProvider(Provider):
|
||||
class KubernetesProvider(Provider, QuotaSupport):
|
||||
log = logging.getLogger("nodepool.driver.kubernetes.KubernetesProvider")
|
||||
|
||||
def __init__(self, provider, *args):
|
||||
super().__init__()
|
||||
self.provider = provider
|
||||
self._zk = None
|
||||
self.ready = False
|
||||
try:
|
||||
self.k8s_client, self.rbac_client = self._get_client(
|
||||
@ -63,6 +67,7 @@ class KubernetesProvider(Provider):
|
||||
|
||||
def start(self, zk_conn):
|
||||
self.log.debug("Starting")
|
||||
self._zk = zk_conn
|
||||
if self.ready or not self.k8s_client or not self.rbac_client:
|
||||
return
|
||||
self.ready = True
|
||||
@ -319,3 +324,19 @@ class KubernetesProvider(Provider):
|
||||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return handler.KubernetesNodeRequestHandler(poolworker, request)
|
||||
|
||||
def getProviderLimits(self):
|
||||
# TODO: query the api to get real limits
|
||||
return QuotaInformation(
|
||||
cores=math.inf,
|
||||
instances=math.inf,
|
||||
ram=math.inf,
|
||||
default=math.inf)
|
||||
|
||||
def quotaNeededByLabel(self, ntype, pool):
|
||||
# TODO: return real quota information about a label
|
||||
return QuotaInformation(cores=1, instances=1, ram=1, default=1)
|
||||
|
||||
def unmanagedQuotaUsed(self):
|
||||
# TODO: return real quota information about quota
|
||||
return QuotaInformation()
|
||||
|
@ -145,6 +145,7 @@ class SimpleTaskManagerLauncher(NodeLauncher):
|
||||
class SimpleTaskManagerHandler(NodeRequestHandler):
|
||||
log = logging.getLogger("nodepool.driver.simple."
|
||||
"SimpleTaskManagerHandler")
|
||||
launcher = SimpleTaskManagerLauncher
|
||||
|
||||
def __init__(self, pw, request):
|
||||
super().__init__(pw, request)
|
||||
@ -230,8 +231,8 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
|
||||
'''
|
||||
Check if all launch requests have completed.
|
||||
|
||||
When all of the Node objects have reached a final state (READY or
|
||||
FAILED), we'll know all threads have finished the launch process.
|
||||
When all of the Node objects have reached a final state (READY, FAILED
|
||||
or ABORTED), we'll know all threads have finished the launch process.
|
||||
'''
|
||||
if not self._threads:
|
||||
return True
|
||||
@ -244,14 +245,15 @@ class SimpleTaskManagerHandler(NodeRequestHandler):
|
||||
|
||||
# NOTE: It is very important that NodeLauncher always sets one
|
||||
# of these states, no matter what.
|
||||
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
|
||||
if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
|
||||
for s in node_states):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def launch(self, node):
|
||||
label = self.pool.labels[node.type[0]]
|
||||
thd = SimpleTaskManagerLauncher(self, node, self.provider, label)
|
||||
thd = self.launcher(self, node, self.provider, label)
|
||||
thd.start()
|
||||
self._threads.append(thd)
|
||||
|
||||
|
@ -15,6 +15,7 @@ providers:
|
||||
context: minikube
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 2
|
||||
labels:
|
||||
- name: kubernetes-namespace
|
||||
type: namespace
|
||||
|
1
nodepool/tests/fixtures/kubernetes.yaml
vendored
1
nodepool/tests/fixtures/kubernetes.yaml
vendored
@ -13,6 +13,7 @@ providers:
|
||||
context: admin-cluster.local
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 2
|
||||
node-attributes:
|
||||
key1: value1
|
||||
key2: value2
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import fixtures
|
||||
import logging
|
||||
import time
|
||||
|
||||
from nodepool import tests
|
||||
from nodepool import zk
|
||||
@ -155,3 +156,45 @@ class TestDriverKubernetes(tests.DBTestCase):
|
||||
self.zk.storeNode(node)
|
||||
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
def test_kubernetes_max_servers(self):
|
||||
configfile = self.setup_config('kubernetes.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
# Start two pods to hit max-server limit
|
||||
reqs = []
|
||||
for x in [1, 2]:
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('pod-fedora')
|
||||
self.zk.storeNodeRequest(req)
|
||||
reqs.append(req)
|
||||
|
||||
fulfilled_reqs = []
|
||||
for req in reqs:
|
||||
self.log.debug("Waiting for request %s", req.id)
|
||||
r = self.waitForNodeRequest(req)
|
||||
self.assertEqual(r.state, zk.FULFILLED)
|
||||
fulfilled_reqs.append(r)
|
||||
|
||||
# Now request a third pod that will hit the limit
|
||||
max_req = zk.NodeRequest()
|
||||
max_req.state = zk.REQUESTED
|
||||
max_req.node_types.append('pod-fedora')
|
||||
self.zk.storeNodeRequest(max_req)
|
||||
|
||||
# The previous request should pause the handler
|
||||
pool_worker = pool.getPoolWorkers('kubespray')
|
||||
while not pool_worker[0].paused_handler:
|
||||
time.sleep(0.1)
|
||||
|
||||
# Delete the earlier two pods freeing space for the third.
|
||||
for req in fulfilled_reqs:
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
node.state = zk.DELETING
|
||||
self.zk.storeNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
# We should unpause and fulfill this now
|
||||
req = self.waitForNodeRequest(max_req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
Loading…
Reference in New Issue
Block a user