Add "slots" to static node driver
Add persistent slot numbers for static nodes. This facilitates avoiding workspace collisions on nodes with max-parallel-jobs > 1. Change-Id: I30bbfc79a60b9e15f1255ad001a879521a181294
This commit is contained in:
parent
04e5b2c2c6
commit
08fdeed241
@ -17,7 +17,6 @@ import logging
|
||||
import math
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
||||
from collections import Counter, namedtuple
|
||||
|
||||
from nodepool import exceptions
|
||||
@ -33,15 +32,16 @@ class StaticNodeError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
Node = namedtuple("Node", ["hostname", "username", "port"])
|
||||
NodeTuple = namedtuple("NodeTuple", ["hostname", "username", "port"])
|
||||
|
||||
|
||||
def nodeTuple(node):
|
||||
"""Return an unique identifier tuple for a static node"""
|
||||
if isinstance(node, dict):
|
||||
return Node(node["name"], node["username"], node["connection-port"])
|
||||
return NodeTuple(node["name"], node["username"],
|
||||
node["connection-port"])
|
||||
else:
|
||||
return Node(node.hostname, node.username, node.connection_port)
|
||||
return NodeTuple(node.hostname, node.username, node.connection_port)
|
||||
|
||||
|
||||
class StaticNodeProvider(Provider, QuotaSupport):
|
||||
@ -58,34 +58,39 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
# Lock to avoid data races when registering nodes from
|
||||
# multiple threads (e.g. cleanup and deleted node worker).
|
||||
self._register_lock = threading.Lock()
|
||||
self._node_slots = {} # nodeTuple -> [node]
|
||||
|
||||
def checkHost(self, node):
|
||||
def _getSlot(self, node):
|
||||
return self._node_slots[nodeTuple(node)].index(node)
|
||||
|
||||
def checkHost(self, static_node):
|
||||
'''Check node is reachable'''
|
||||
# only gather host keys if the connection type is ssh or network_cli
|
||||
gather_hostkeys = (
|
||||
node["connection-type"] == 'ssh' or
|
||||
node["connection-type"] == 'network_cli')
|
||||
if gather_hostkeys and not node.get('host-key-checking', True):
|
||||
return node['host-key']
|
||||
static_node["connection-type"] == 'ssh' or
|
||||
static_node["connection-type"] == 'network_cli')
|
||||
if gather_hostkeys and not static_node.get('host-key-checking', True):
|
||||
return static_node['host-key']
|
||||
try:
|
||||
keys = nodeutils.nodescan(node["name"],
|
||||
port=node["connection-port"],
|
||||
timeout=node["timeout"],
|
||||
keys = nodeutils.nodescan(static_node["name"],
|
||||
port=static_node["connection-port"],
|
||||
timeout=static_node["timeout"],
|
||||
gather_hostkeys=gather_hostkeys)
|
||||
except exceptions.ConnectionTimeoutException:
|
||||
raise StaticNodeError(
|
||||
"{}: ConnectionTimeoutException".format(nodeTuple(node)))
|
||||
"{}: ConnectionTimeoutException".format(
|
||||
nodeTuple(static_node)))
|
||||
|
||||
if not gather_hostkeys:
|
||||
return []
|
||||
|
||||
# Check node host-key
|
||||
if set(node["host-key"]).issubset(set(keys)):
|
||||
if set(static_node["host-key"]).issubset(set(keys)):
|
||||
return keys
|
||||
|
||||
node_tuple = nodeTuple(node)
|
||||
node_tuple = nodeTuple(static_node)
|
||||
self.log.debug("%s: Registered key '%s' not in %s",
|
||||
node_tuple, node["host-key"], keys)
|
||||
node_tuple, static_node["host-key"], keys)
|
||||
raise StaticNodeError(
|
||||
"{}: host key mismatches ({})".format(node_tuple, keys))
|
||||
|
||||
@ -128,12 +133,20 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
node_tuple, exc)
|
||||
|
||||
try:
|
||||
self.deregisterNode(count=1, node_tuple=node_tuple)
|
||||
self.deregisterNode(node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
|
||||
return False
|
||||
|
||||
def _debugSlots(self, node_slots, unslotted_nodes=None):
|
||||
for k, nodes in node_slots.items():
|
||||
self.log.debug("Slot status for %s:", k)
|
||||
for i, node in enumerate(nodes):
|
||||
self.log.debug("Slot %i: %s", i, getattr(node, 'id', None))
|
||||
if unslotted_nodes is not None:
|
||||
self.log.debug("Unslotted nodes: %s", unslotted_nodes)
|
||||
|
||||
def getRegisteredNodes(self):
|
||||
'''
|
||||
Get node tuples for all registered static nodes.
|
||||
@ -143,50 +156,96 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
:returns: A set of registered (hostnames, usernames, ports) tuple for
|
||||
the static driver.
|
||||
'''
|
||||
registered = Counter()
|
||||
unslotted_nodes = []
|
||||
node_slots = {}
|
||||
|
||||
# Initialize our slot counters for each node tuple.
|
||||
for pool in self.provider.pools.values():
|
||||
for static_node in pool.nodes:
|
||||
node_slots[nodeTuple(static_node)] = [
|
||||
None for x in range(static_node["max-parallel-jobs"])]
|
||||
|
||||
# Find all nodes with slot ids and store them in node_slots.
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
registered.update([nodeTuple(node)])
|
||||
return registered
|
||||
if node.state in {zk.BUILDING, zk.DELETING}:
|
||||
continue
|
||||
if nodeTuple(node) in node_slots:
|
||||
if (node.slot is not None and
|
||||
len(node_slots[nodeTuple(node)]) > node.slot and
|
||||
node_slots[nodeTuple(node)][node.slot] is None):
|
||||
node_slots[nodeTuple(node)][node.slot] = node
|
||||
else:
|
||||
# We have more registered nodes of this type than
|
||||
# slots; there may have been a reduction; track in
|
||||
# order to delete.
|
||||
unslotted_nodes.append(node)
|
||||
else:
|
||||
# We don't know anything about this node; it may have
|
||||
# been removed from the config. We still need to
|
||||
# track it in order to decide to delete it.
|
||||
node_slots[nodeTuple(node)] = []
|
||||
unslotted_nodes.append(node)
|
||||
|
||||
def registerNodeFromConfig(self, count, provider_name, pool,
|
||||
static_node):
|
||||
'''
|
||||
Register a static node from the config with ZooKeeper.
|
||||
# This can be very chatty, so we don't normally log it. It
|
||||
# can be helpful when debugging tests.
|
||||
# self._debugSlots(node_slots, unslotted_nodes)
|
||||
|
||||
A node can be registered multiple times to support max-parallel-jobs.
|
||||
These nodes will share the same node tuple.
|
||||
# Find all nodes without slot ids, store each in first available slot
|
||||
for node in unslotted_nodes:
|
||||
if None in node_slots[nodeTuple(node)]:
|
||||
# This is a backwards-compat case; we have room for
|
||||
# the node, it's just that the node metadata doesn't
|
||||
# have a slot number.
|
||||
idx = node_slots[nodeTuple(node)].index(None)
|
||||
node_slots[nodeTuple(node)][idx] = node
|
||||
else:
|
||||
# We have more nodes than expected.
|
||||
self.log.warning("Tracking excess node %s as %s slot %s",
|
||||
node, nodeTuple(node),
|
||||
len(node_slots[nodeTuple(node)]))
|
||||
node_slots[nodeTuple(node)].append(node)
|
||||
self._node_slots = node_slots
|
||||
|
||||
def registerNodeFromConfig(self, provider_name, pool, static_node,
|
||||
slot):
|
||||
'''Register a static node from the config with ZooKeeper.
|
||||
|
||||
A node can be registered multiple times to support
|
||||
max-parallel-jobs. These nodes will share the same node tuple
|
||||
but have distinct slot numbers.
|
||||
|
||||
:param int count: Number of times to register this node.
|
||||
:param str provider_name: Name of the provider.
|
||||
:param str pool: Config of the pool owning the node.
|
||||
:param dict static_node: The node definition from the config file.
|
||||
:param int slot: The slot number for this node.
|
||||
|
||||
'''
|
||||
pool_name = pool.name
|
||||
host_keys = self.checkHost(static_node)
|
||||
node_tuple = nodeTuple(static_node)
|
||||
|
||||
for i in range(0, count):
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.provider = provider_name
|
||||
node.pool = pool_name
|
||||
node.launcher = "static driver"
|
||||
node.type = static_node["labels"]
|
||||
node.external_id = static_node["name"]
|
||||
node.hostname = static_node["name"]
|
||||
node.username = static_node["username"]
|
||||
node.interface_ip = static_node["name"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
node.python_path = static_node["python-path"]
|
||||
node.shell_type = static_node["shell-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = host_keys
|
||||
node.attributes = pool.node_attributes
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Registered static node %s", node_tuple)
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.provider = provider_name
|
||||
node.pool = pool_name
|
||||
node.launcher = "static driver"
|
||||
node.type = static_node["labels"]
|
||||
node.external_id = static_node["name"]
|
||||
node.hostname = static_node["name"]
|
||||
node.username = static_node["username"]
|
||||
node.interface_ip = static_node["name"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
node.python_path = static_node["python-path"]
|
||||
node.shell_type = static_node["shell-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = host_keys
|
||||
node.attributes = pool.node_attributes
|
||||
node.slot = slot
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Registered static node %s", node_tuple)
|
||||
|
||||
def updateNodeFromConfig(self, static_node):
|
||||
'''
|
||||
@ -205,8 +264,8 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
static_node["labels"],
|
||||
static_node["username"],
|
||||
static_node["connection-port"],
|
||||
static_node["connection-type"],
|
||||
static_node["shell-type"],
|
||||
static_node["connection-type"],
|
||||
static_node["python-path"],
|
||||
host_keys,
|
||||
)
|
||||
@ -240,7 +299,7 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
finally:
|
||||
self.zk.unlockNode(node)
|
||||
|
||||
def deregisterNode(self, count, node_tuple):
|
||||
def deregisterNode(self, node):
|
||||
'''
|
||||
Attempt to delete READY nodes.
|
||||
|
||||
@ -248,75 +307,64 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
let them remain until they naturally are deleted (we won't re-register
|
||||
them after they are deleted).
|
||||
|
||||
:param Node node_tuple: the namedtuple Node.
|
||||
:param Node node: the zk Node object.
|
||||
'''
|
||||
self.log.debug("Deregistering %s node(s) matching %s",
|
||||
count, node_tuple)
|
||||
node_tuple = nodeTuple(node)
|
||||
self.log.debug("Deregistering node %s", node)
|
||||
|
||||
nodes = self.getRegisteredReadyNodes(node_tuple)
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
return
|
||||
|
||||
for node in nodes:
|
||||
if count <= 0:
|
||||
break
|
||||
# Double check the state now that we have a lock since it
|
||||
# may have changed on us. We keep using the original node
|
||||
# since it's holding the lock.
|
||||
_node = self.zk.getNode(node.id)
|
||||
if _node and _node.state != zk.READY:
|
||||
# State changed so skip it.
|
||||
self.zk.unlockNode(node)
|
||||
return
|
||||
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
continue
|
||||
node.state = zk.DELETING
|
||||
try:
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Deregistered static node: id=%s, "
|
||||
"node_tuple=%s", node.id, node_tuple)
|
||||
except Exception:
|
||||
self.log.exception("Error deregistering static node:")
|
||||
finally:
|
||||
self.zk.unlockNode(node)
|
||||
|
||||
# Double check the state now that we have a lock since it
|
||||
# may have changed on us. We keep using the original node
|
||||
# since it's holding the lock.
|
||||
_node = self.zk.getNode(node.id)
|
||||
if _node.state != zk.READY:
|
||||
# State changed so skip it.
|
||||
self.zk.unlockNode(node)
|
||||
continue
|
||||
|
||||
node.state = zk.DELETING
|
||||
try:
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Deregistered static node: id=%s, "
|
||||
"node_tuple=%s", node.id, node_tuple)
|
||||
count = count - 1
|
||||
except Exception:
|
||||
self.log.exception("Error deregistering static node:")
|
||||
finally:
|
||||
self.zk.unlockNode(node)
|
||||
|
||||
def syncNodeCount(self, registered, node, pool):
|
||||
current_count = registered[nodeTuple(node)]
|
||||
|
||||
# Register nodes to synchronize with our configuration.
|
||||
if current_count < node["max-parallel-jobs"]:
|
||||
register_cnt = node["max-parallel-jobs"] - current_count
|
||||
self.registerNodeFromConfig(
|
||||
register_cnt, self.provider.name, pool, node)
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers an existing node, but with a decreased
|
||||
# max-parallel-jobs value.
|
||||
elif current_count > node["max-parallel-jobs"]:
|
||||
deregister_cnt = current_count - node["max-parallel-jobs"]
|
||||
try:
|
||||
self.deregisterNode(deregister_cnt, nodeTuple(node))
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
def syncNodeCount(self, static_node, pool):
|
||||
for slot, node in enumerate(self._node_slots[nodeTuple(static_node)]):
|
||||
if node is None:
|
||||
# Register nodes to synchronize with our configuration.
|
||||
self.registerNodeFromConfig(self.provider.name, pool,
|
||||
static_node, slot)
|
||||
elif slot >= static_node["max-parallel-jobs"]:
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers an existing node, but with a decreased
|
||||
# max-parallel-jobs value.
|
||||
try:
|
||||
self.deregisterNode(node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
|
||||
def _start(self, zk_conn):
|
||||
self.zk = zk_conn
|
||||
registered = self.getRegisteredNodes()
|
||||
self.getRegisteredNodes()
|
||||
|
||||
static_nodes = {}
|
||||
with ThreadPoolExecutor() as executor:
|
||||
for pool in self.provider.pools.values():
|
||||
synced_nodes = []
|
||||
for node in pool.nodes:
|
||||
synced_nodes.append((node, executor.submit(
|
||||
self.syncNodeCount, registered, node, pool)))
|
||||
for static_node in pool.nodes:
|
||||
synced_nodes.append((static_node, executor.submit(
|
||||
self.syncNodeCount, static_node, pool)))
|
||||
|
||||
for node, result in synced_nodes:
|
||||
for static_node, result in synced_nodes:
|
||||
try:
|
||||
result.result()
|
||||
except StaticNodeError as exc:
|
||||
@ -324,32 +372,33 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't sync node %s:",
|
||||
nodeTuple(node))
|
||||
nodeTuple(static_node))
|
||||
continue
|
||||
|
||||
try:
|
||||
self.updateNodeFromConfig(node)
|
||||
self.updateNodeFromConfig(static_node)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning(
|
||||
"Couldn't update static node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't update static node %s:",
|
||||
nodeTuple(node))
|
||||
nodeTuple(static_node))
|
||||
continue
|
||||
|
||||
static_nodes[nodeTuple(node)] = node
|
||||
static_nodes[nodeTuple(static_node)] = static_node
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers any registered nodes that no longer appear in
|
||||
# the config.
|
||||
for node in list(registered):
|
||||
if node not in static_nodes:
|
||||
try:
|
||||
self.deregisterNode(registered[node], node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
continue
|
||||
for node_tuple, nodes in self._node_slots.items():
|
||||
if node_tuple not in static_nodes:
|
||||
for node in nodes:
|
||||
try:
|
||||
self.deregisterNode(node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
continue
|
||||
|
||||
def start(self, zk_conn):
|
||||
try:
|
||||
@ -360,15 +409,6 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
||||
def listNodes(self):
|
||||
registered = self.getRegisteredNodes()
|
||||
servers = []
|
||||
for pool in self.provider.pools.values():
|
||||
for node in pool.nodes:
|
||||
if nodeTuple(node) in registered:
|
||||
servers.append(node)
|
||||
return servers
|
||||
|
||||
def poolNodes(self):
|
||||
return {
|
||||
nodeTuple(n): n
|
||||
@ -395,17 +435,17 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
|
||||
def cleanupLeakedResources(self):
|
||||
with self._register_lock:
|
||||
registered = self.getRegisteredNodes()
|
||||
self.getRegisteredNodes()
|
||||
for pool in self.provider.pools.values():
|
||||
for node in pool.nodes:
|
||||
for static_node in pool.nodes:
|
||||
try:
|
||||
self.syncNodeCount(registered, node, pool)
|
||||
self.syncNodeCount(static_node, pool)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Couldn't sync node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't sync node %s:",
|
||||
nodeTuple(node))
|
||||
nodeTuple(static_node))
|
||||
continue
|
||||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
@ -424,24 +464,26 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
||||
|
||||
with self._register_lock:
|
||||
try:
|
||||
registered = self.getRegisteredNodes()
|
||||
self.getRegisteredNodes()
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Cannot get registered nodes for re-registration:"
|
||||
)
|
||||
return
|
||||
current_count = registered[node_tuple]
|
||||
slot = node.slot
|
||||
|
||||
if slot is None:
|
||||
return
|
||||
# It's possible we were not able to de-register nodes due to a
|
||||
# config change (because they were in use). In that case, don't
|
||||
# bother to reregister.
|
||||
if current_count >= static_node["max-parallel-jobs"]:
|
||||
if slot >= static_node["max-parallel-jobs"]:
|
||||
return
|
||||
|
||||
try:
|
||||
pool = self.provider.pools[node.pool]
|
||||
self.registerNodeFromConfig(
|
||||
1, node.provider, pool, static_node)
|
||||
node.provider, pool, static_node, slot)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Cannot re-register deleted node: %s", exc)
|
||||
except Exception:
|
||||
|
@ -609,7 +609,8 @@ class DBTestCase(BaseTestCase):
|
||||
if label in ready_nodes and len(ready_nodes[label]) == count:
|
||||
break
|
||||
self.wait_for_threads()
|
||||
return ready_nodes[label]
|
||||
return sorted(ready_nodes[label],
|
||||
key=lambda x: x.id)
|
||||
|
||||
def waitForAnyNodeInState(self, state):
|
||||
# Wait for a node to be in the aborted state
|
||||
|
@ -68,6 +68,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
{'key1': 'value1', 'key2': 'value2'})
|
||||
self.assertEqual(nodes[0].python_path, 'auto')
|
||||
self.assertIsNone(nodes[0].shell_type)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_python_path(self):
|
||||
'''
|
||||
@ -80,6 +81,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for node pre-registration")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(nodes[0].python_path, "/usr/bin/python3")
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
nodes[0].state = zk.USED
|
||||
self.zk.storeNode(nodes[0])
|
||||
@ -87,6 +89,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for node to be re-available")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(nodes[0].python_path, "/usr/bin/python3")
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_multiname(self):
|
||||
'''
|
||||
@ -101,11 +104,13 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].state, zk.READY)
|
||||
self.assertEqual(nodes[0].username, 'zuul')
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
nodes = self.waitForNodes('other-label', 1)
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].state, zk.READY)
|
||||
self.assertEqual(nodes[0].username, 'zuul-2')
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
@ -150,6 +155,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(nodes[0].connection_port, 22022)
|
||||
self.assertEqual(nodes[0].connection_type, 'ssh')
|
||||
self.assertEqual(nodes[0].host_keys, ['ssh-rsa FAKEKEY'])
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_node_increase(self):
|
||||
'''
|
||||
@ -162,11 +168,14 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for initial node")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
self.log.debug("Waiting for additional node")
|
||||
self.replace_config(configfile, 'static-2-nodes.yaml')
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 0)
|
||||
|
||||
def test_static_node_decrease(self):
|
||||
'''
|
||||
@ -179,12 +188,15 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for initial nodes")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 0)
|
||||
|
||||
self.log.debug("Waiting for node decrease")
|
||||
self.replace_config(configfile, 'static-basic.yaml')
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].hostname, 'fake-host-1')
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_parallel_increase(self):
|
||||
'''
|
||||
@ -197,11 +209,14 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for initial node")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
self.log.debug("Waiting for additional node")
|
||||
self.replace_config(configfile, 'static-parallel-increase.yaml')
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 1)
|
||||
|
||||
def test_static_parallel_decrease(self):
|
||||
'''
|
||||
@ -214,11 +229,14 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for initial nodes")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 1)
|
||||
|
||||
self.log.debug("Waiting for node decrease")
|
||||
self.replace_config(configfile, 'static-basic.yaml')
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_node_update(self):
|
||||
'''
|
||||
@ -242,6 +260,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(nodes[0].connection_port, 5986)
|
||||
self.assertEqual(nodes[0].connection_type, 'winrm')
|
||||
self.assertEqual(nodes[0].host_keys, [])
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_node_update_startup(self):
|
||||
'''
|
||||
@ -266,6 +285,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(nodes[0].id, "0000000000")
|
||||
self.assertIn('fake-label', nodes[0].type)
|
||||
self.assertIn('fake-label2', nodes[0].type)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_multilabel(self):
|
||||
configfile = self.setup_config('static-multilabel.yaml')
|
||||
@ -274,23 +294,26 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertIn('fake-label', nodes[0].type)
|
||||
self.assertIn('fake-label2', nodes[0].type)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_handler(self):
|
||||
configfile = self.setup_config('static.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
node = self.waitForNodes('fake-label')
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.waitForNodes('fake-concurrent-label', 2)
|
||||
|
||||
node = node[0]
|
||||
node = nodes[0]
|
||||
self.log.debug("Marking first node as used %s", node.id)
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
self.log.debug("Waiting for node to be re-available")
|
||||
node = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(node), 1)
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_static_waiting_handler(self):
|
||||
configfile = self.setup_config('static-2-nodes-multilabel.yaml')
|
||||
@ -390,6 +413,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
# Make sure the node is not reallocated
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.assertIsNotNone(node)
|
||||
self.assertEqual(node.slot, 0)
|
||||
|
||||
def test_static_waiting_handler_order(self):
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
@ -402,6 +426,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.assertEqual(node.slot, 0)
|
||||
self.zk.lockNode(node)
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
@ -431,6 +456,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(req_waiting3.state, zk.REQUESTED)
|
||||
|
||||
node_waiting2 = self.zk.getNode(req_waiting2.nodes[0])
|
||||
self.assertEqual(node_waiting2.slot, 0)
|
||||
self.zk.lockNode(node_waiting2)
|
||||
node_waiting2.state = zk.USED
|
||||
self.zk.storeNode(node_waiting2)
|
||||
@ -441,6 +467,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.assertEqual(req_waiting1.state, zk.REQUESTED)
|
||||
|
||||
node_waiting3 = self.zk.getNode(req_waiting3.nodes[0])
|
||||
self.assertEqual(node_waiting3.slot, 0)
|
||||
self.zk.lockNode(node_waiting3)
|
||||
node_waiting3.state = zk.USED
|
||||
self.zk.storeNode(node_waiting3)
|
||||
@ -502,6 +529,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
pool.start()
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
@ -522,6 +550,7 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
new_nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(new_nodes), 1)
|
||||
self.assertEqual(nodes[0].hostname, new_nodes[0].hostname)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
|
||||
def test_liveness_check(self):
|
||||
'''
|
||||
@ -642,9 +671,13 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
self.log.debug("Waiting for initial nodes")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 0)
|
||||
|
||||
self.zk.deleteNode(nodes[0])
|
||||
|
||||
self.log.debug("Waiting for node to transition to ready again")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0].slot, 0)
|
||||
self.assertEqual(nodes[1].slot, 0)
|
||||
|
@ -520,6 +520,7 @@ class Node(BaseModel):
|
||||
self.host_keys = []
|
||||
self.hold_expiration = None
|
||||
self.resources = None
|
||||
self.slot = None
|
||||
self.attributes = None
|
||||
self.python_path = None
|
||||
self.tenant_name = None
|
||||
@ -564,6 +565,7 @@ class Node(BaseModel):
|
||||
self.host_keys == other.host_keys and
|
||||
self.hold_expiration == other.hold_expiration and
|
||||
self.resources == other.resources and
|
||||
self.slot == other.slot and
|
||||
self.attributes == other.attributes and
|
||||
self.python_path == other.python_path and
|
||||
self.tenant_name == other.tenant_name and
|
||||
@ -618,6 +620,7 @@ class Node(BaseModel):
|
||||
d['shell_type'] = self.shell_type
|
||||
d['hold_expiration'] = self.hold_expiration
|
||||
d['resources'] = self.resources
|
||||
d['slot'] = self.slot
|
||||
d['attributes'] = self.attributes
|
||||
d['python_path'] = self.python_path
|
||||
d['tenant_name'] = self.tenant_name
|
||||
@ -686,6 +689,7 @@ class Node(BaseModel):
|
||||
else:
|
||||
self.hold_expiration = hold_expiration
|
||||
self.resources = d.get('resources')
|
||||
self.slot = d.get('slot')
|
||||
self.attributes = d.get('attributes')
|
||||
self.python_path = d.get('python_path')
|
||||
self.shell_type = d.get('shell_type')
|
||||
|
Loading…
x
Reference in New Issue
Block a user