From 08fdeed24110d278e845b2a17010f08e713f1e49 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 7 Sep 2021 17:26:31 -0700 Subject: [PATCH] Add "slots" to static node driver Add persistent slot numbers for static nodes. This facilitates avoiding workspace collisions on nodes with max-parallel-jobs > 1. Change-Id: I30bbfc79a60b9e15f1255ad001a879521a181294 --- nodepool/driver/static/provider.py | 306 ++++++++++++---------- nodepool/tests/__init__.py | 3 +- nodepool/tests/unit/test_driver_static.py | 41 ++- nodepool/zk/zookeeper.py | 4 + 4 files changed, 217 insertions(+), 137 deletions(-) diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py index c8e10f278..98decb7f7 100644 --- a/nodepool/driver/static/provider.py +++ b/nodepool/driver/static/provider.py @@ -17,7 +17,6 @@ import logging import math import threading from concurrent.futures.thread import ThreadPoolExecutor - from collections import Counter, namedtuple from nodepool import exceptions @@ -33,15 +32,16 @@ class StaticNodeError(Exception): pass -Node = namedtuple("Node", ["hostname", "username", "port"]) +NodeTuple = namedtuple("NodeTuple", ["hostname", "username", "port"]) def nodeTuple(node): """Return an unique identifier tuple for a static node""" if isinstance(node, dict): - return Node(node["name"], node["username"], node["connection-port"]) + return NodeTuple(node["name"], node["username"], + node["connection-port"]) else: - return Node(node.hostname, node.username, node.connection_port) + return NodeTuple(node.hostname, node.username, node.connection_port) class StaticNodeProvider(Provider, QuotaSupport): @@ -58,34 +58,39 @@ class StaticNodeProvider(Provider, QuotaSupport): # Lock to avoid data races when registering nodes from # multiple threads (e.g. cleanup and deleted node worker). self._register_lock = threading.Lock() + self._node_slots = {} # nodeTuple -> [node] - def checkHost(self, node): + def _getSlot(self, node): + return self._node_slots[nodeTuple(node)].index(node) + + def checkHost(self, static_node): '''Check node is reachable''' # only gather host keys if the connection type is ssh or network_cli gather_hostkeys = ( - node["connection-type"] == 'ssh' or - node["connection-type"] == 'network_cli') - if gather_hostkeys and not node.get('host-key-checking', True): - return node['host-key'] + static_node["connection-type"] == 'ssh' or + static_node["connection-type"] == 'network_cli') + if gather_hostkeys and not static_node.get('host-key-checking', True): + return static_node['host-key'] try: - keys = nodeutils.nodescan(node["name"], - port=node["connection-port"], - timeout=node["timeout"], + keys = nodeutils.nodescan(static_node["name"], + port=static_node["connection-port"], + timeout=static_node["timeout"], gather_hostkeys=gather_hostkeys) except exceptions.ConnectionTimeoutException: raise StaticNodeError( - "{}: ConnectionTimeoutException".format(nodeTuple(node))) + "{}: ConnectionTimeoutException".format( + nodeTuple(static_node))) if not gather_hostkeys: return [] # Check node host-key - if set(node["host-key"]).issubset(set(keys)): + if set(static_node["host-key"]).issubset(set(keys)): return keys - node_tuple = nodeTuple(node) + node_tuple = nodeTuple(static_node) self.log.debug("%s: Registered key '%s' not in %s", - node_tuple, node["host-key"], keys) + node_tuple, static_node["host-key"], keys) raise StaticNodeError( "{}: host key mismatches ({})".format(node_tuple, keys)) @@ -128,12 +133,20 @@ class StaticNodeProvider(Provider, QuotaSupport): node_tuple, exc) try: - self.deregisterNode(count=1, node_tuple=node_tuple) + self.deregisterNode(node) except Exception: self.log.exception("Couldn't deregister static node:") return False + def _debugSlots(self, node_slots, unslotted_nodes=None): + for k, nodes in node_slots.items(): + self.log.debug("Slot status for %s:", k) + for i, node in enumerate(nodes): + self.log.debug("Slot %i: %s", i, getattr(node, 'id', None)) + if unslotted_nodes is not None: + self.log.debug("Unslotted nodes: %s", unslotted_nodes) + def getRegisteredNodes(self): ''' Get node tuples for all registered static nodes. @@ -143,50 +156,96 @@ class StaticNodeProvider(Provider, QuotaSupport): :returns: A set of registered (hostnames, usernames, ports) tuple for the static driver. ''' - registered = Counter() + unslotted_nodes = [] + node_slots = {} + + # Initialize our slot counters for each node tuple. + for pool in self.provider.pools.values(): + for static_node in pool.nodes: + node_slots[nodeTuple(static_node)] = [ + None for x in range(static_node["max-parallel-jobs"])] + + # Find all nodes with slot ids and store them in node_slots. for node in self.zk.nodeIterator(): if node.provider != self.provider.name: continue - registered.update([nodeTuple(node)]) - return registered + if node.state in {zk.BUILDING, zk.DELETING}: + continue + if nodeTuple(node) in node_slots: + if (node.slot is not None and + len(node_slots[nodeTuple(node)]) > node.slot and + node_slots[nodeTuple(node)][node.slot] is None): + node_slots[nodeTuple(node)][node.slot] = node + else: + # We have more registered nodes of this type than + # slots; there may have been a reduction; track in + # order to delete. + unslotted_nodes.append(node) + else: + # We don't know anything about this node; it may have + # been removed from the config. We still need to + # track it in order to decide to delete it. + node_slots[nodeTuple(node)] = [] + unslotted_nodes.append(node) - def registerNodeFromConfig(self, count, provider_name, pool, - static_node): - ''' - Register a static node from the config with ZooKeeper. + # This can be very chatty, so we don't normally log it. It + # can be helpful when debugging tests. + # self._debugSlots(node_slots, unslotted_nodes) - A node can be registered multiple times to support max-parallel-jobs. - These nodes will share the same node tuple. + # Find all nodes without slot ids, store each in first available slot + for node in unslotted_nodes: + if None in node_slots[nodeTuple(node)]: + # This is a backwards-compat case; we have room for + # the node, it's just that the node metadata doesn't + # have a slot number. + idx = node_slots[nodeTuple(node)].index(None) + node_slots[nodeTuple(node)][idx] = node + else: + # We have more nodes than expected. + self.log.warning("Tracking excess node %s as %s slot %s", + node, nodeTuple(node), + len(node_slots[nodeTuple(node)])) + node_slots[nodeTuple(node)].append(node) + self._node_slots = node_slots + + def registerNodeFromConfig(self, provider_name, pool, static_node, + slot): + '''Register a static node from the config with ZooKeeper. + + A node can be registered multiple times to support + max-parallel-jobs. These nodes will share the same node tuple + but have distinct slot numbers. - :param int count: Number of times to register this node. :param str provider_name: Name of the provider. :param str pool: Config of the pool owning the node. :param dict static_node: The node definition from the config file. + :param int slot: The slot number for this node. + ''' pool_name = pool.name host_keys = self.checkHost(static_node) node_tuple = nodeTuple(static_node) - for i in range(0, count): - node = zk.Node() - node.state = zk.READY - node.provider = provider_name - node.pool = pool_name - node.launcher = "static driver" - node.type = static_node["labels"] - node.external_id = static_node["name"] - node.hostname = static_node["name"] - node.username = static_node["username"] - node.interface_ip = static_node["name"] - node.connection_port = static_node["connection-port"] - node.connection_type = static_node["connection-type"] - node.python_path = static_node["python-path"] - node.shell_type = static_node["shell-type"] - nodeutils.set_node_ip(node) - node.host_keys = host_keys - node.attributes = pool.node_attributes - self.zk.storeNode(node) - self.log.debug("Registered static node %s", node_tuple) + node = zk.Node() + node.state = zk.READY + node.provider = provider_name + node.pool = pool_name + node.launcher = "static driver" + node.type = static_node["labels"] + node.external_id = static_node["name"] + node.hostname = static_node["name"] + node.username = static_node["username"] + node.interface_ip = static_node["name"] + node.connection_port = static_node["connection-port"] + node.connection_type = static_node["connection-type"] + node.python_path = static_node["python-path"] + node.shell_type = static_node["shell-type"] + nodeutils.set_node_ip(node) + node.host_keys = host_keys + node.attributes = pool.node_attributes + node.slot = slot + self.zk.storeNode(node) + self.log.debug("Registered static node %s", node_tuple) def updateNodeFromConfig(self, static_node): ''' @@ -205,8 +264,8 @@ class StaticNodeProvider(Provider, QuotaSupport): static_node["labels"], static_node["username"], static_node["connection-port"], - static_node["connection-type"], static_node["shell-type"], + static_node["connection-type"], static_node["python-path"], host_keys, ) @@ -240,7 +299,7 @@ class StaticNodeProvider(Provider, QuotaSupport): finally: self.zk.unlockNode(node) - def deregisterNode(self, count, node_tuple): + def deregisterNode(self, node): ''' Attempt to delete READY nodes. @@ -248,75 +307,64 @@ class StaticNodeProvider(Provider, QuotaSupport): let them remain until they naturally are deleted (we won't re-register them after they are deleted). - :param Node node_tuple: the namedtuple Node. + :param Node node: the zk Node object. ''' - self.log.debug("Deregistering %s node(s) matching %s", - count, node_tuple) + node_tuple = nodeTuple(node) + self.log.debug("Deregistering node %s", node) - nodes = self.getRegisteredReadyNodes(node_tuple) + try: + self.zk.lockNode(node, blocking=False) + except exceptions.ZKLockException: + # It's already locked so skip it. + return - for node in nodes: - if count <= 0: - break + # Double check the state now that we have a lock since it + # may have changed on us. We keep using the original node + # since it's holding the lock. + _node = self.zk.getNode(node.id) + if _node and _node.state != zk.READY: + # State changed so skip it. + self.zk.unlockNode(node) + return - try: - self.zk.lockNode(node, blocking=False) - except exceptions.ZKLockException: - # It's already locked so skip it. - continue + node.state = zk.DELETING + try: + self.zk.storeNode(node) + self.log.debug("Deregistered static node: id=%s, " + "node_tuple=%s", node.id, node_tuple) + except Exception: + self.log.exception("Error deregistering static node:") + finally: + self.zk.unlockNode(node) - # Double check the state now that we have a lock since it - # may have changed on us. We keep using the original node - # since it's holding the lock. - _node = self.zk.getNode(node.id) - if _node.state != zk.READY: - # State changed so skip it. - self.zk.unlockNode(node) - continue - - node.state = zk.DELETING - try: - self.zk.storeNode(node) - self.log.debug("Deregistered static node: id=%s, " - "node_tuple=%s", node.id, node_tuple) - count = count - 1 - except Exception: - self.log.exception("Error deregistering static node:") - finally: - self.zk.unlockNode(node) - - def syncNodeCount(self, registered, node, pool): - current_count = registered[nodeTuple(node)] - - # Register nodes to synchronize with our configuration. - if current_count < node["max-parallel-jobs"]: - register_cnt = node["max-parallel-jobs"] - current_count - self.registerNodeFromConfig( - register_cnt, self.provider.name, pool, node) - - # De-register nodes to synchronize with our configuration. - # This case covers an existing node, but with a decreased - # max-parallel-jobs value. - elif current_count > node["max-parallel-jobs"]: - deregister_cnt = current_count - node["max-parallel-jobs"] - try: - self.deregisterNode(deregister_cnt, nodeTuple(node)) - except Exception: - self.log.exception("Couldn't deregister static node:") + def syncNodeCount(self, static_node, pool): + for slot, node in enumerate(self._node_slots[nodeTuple(static_node)]): + if node is None: + # Register nodes to synchronize with our configuration. + self.registerNodeFromConfig(self.provider.name, pool, + static_node, slot) + elif slot >= static_node["max-parallel-jobs"]: + # De-register nodes to synchronize with our configuration. + # This case covers an existing node, but with a decreased + # max-parallel-jobs value. + try: + self.deregisterNode(node) + except Exception: + self.log.exception("Couldn't deregister static node:") def _start(self, zk_conn): self.zk = zk_conn - registered = self.getRegisteredNodes() + self.getRegisteredNodes() static_nodes = {} with ThreadPoolExecutor() as executor: for pool in self.provider.pools.values(): synced_nodes = [] - for node in pool.nodes: - synced_nodes.append((node, executor.submit( - self.syncNodeCount, registered, node, pool))) + for static_node in pool.nodes: + synced_nodes.append((static_node, executor.submit( + self.syncNodeCount, static_node, pool))) - for node, result in synced_nodes: + for static_node, result in synced_nodes: try: result.result() except StaticNodeError as exc: @@ -324,32 +372,33 @@ class StaticNodeProvider(Provider, QuotaSupport): continue except Exception: self.log.exception("Couldn't sync node %s:", - nodeTuple(node)) + nodeTuple(static_node)) continue try: - self.updateNodeFromConfig(node) + self.updateNodeFromConfig(static_node) except StaticNodeError as exc: self.log.warning( "Couldn't update static node: %s", exc) continue except Exception: self.log.exception("Couldn't update static node %s:", - nodeTuple(node)) + nodeTuple(static_node)) continue - static_nodes[nodeTuple(node)] = node + static_nodes[nodeTuple(static_node)] = static_node # De-register nodes to synchronize with our configuration. # This case covers any registered nodes that no longer appear in # the config. - for node in list(registered): - if node not in static_nodes: - try: - self.deregisterNode(registered[node], node) - except Exception: - self.log.exception("Couldn't deregister static node:") - continue + for node_tuple, nodes in self._node_slots.items(): + if node_tuple not in static_nodes: + for node in nodes: + try: + self.deregisterNode(node) + except Exception: + self.log.exception("Couldn't deregister static node:") + continue def start(self, zk_conn): try: @@ -360,15 +409,6 @@ class StaticNodeProvider(Provider, QuotaSupport): def stop(self): self.log.debug("Stopping") - def listNodes(self): - registered = self.getRegisteredNodes() - servers = [] - for pool in self.provider.pools.values(): - for node in pool.nodes: - if nodeTuple(node) in registered: - servers.append(node) - return servers - def poolNodes(self): return { nodeTuple(n): n @@ -395,17 +435,17 @@ class StaticNodeProvider(Provider, QuotaSupport): def cleanupLeakedResources(self): with self._register_lock: - registered = self.getRegisteredNodes() + self.getRegisteredNodes() for pool in self.provider.pools.values(): - for node in pool.nodes: + for static_node in pool.nodes: try: - self.syncNodeCount(registered, node, pool) + self.syncNodeCount(static_node, pool) except StaticNodeError as exc: self.log.warning("Couldn't sync node: %s", exc) continue except Exception: self.log.exception("Couldn't sync node %s:", - nodeTuple(node)) + nodeTuple(static_node)) continue def getRequestHandler(self, poolworker, request): @@ -424,24 +464,26 @@ class StaticNodeProvider(Provider, QuotaSupport): with self._register_lock: try: - registered = self.getRegisteredNodes() + self.getRegisteredNodes() except Exception: self.log.exception( "Cannot get registered nodes for re-registration:" ) return - current_count = registered[node_tuple] + slot = node.slot + if slot is None: + return # It's possible we were not able to de-register nodes due to a # config change (because they were in use). In that case, don't # bother to reregister. - if current_count >= static_node["max-parallel-jobs"]: + if slot >= static_node["max-parallel-jobs"]: return try: pool = self.provider.pools[node.pool] self.registerNodeFromConfig( - 1, node.provider, pool, static_node) + node.provider, pool, static_node, slot) except StaticNodeError as exc: self.log.warning("Cannot re-register deleted node: %s", exc) except Exception: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 602c98243..b352a3183 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -609,7 +609,8 @@ class DBTestCase(BaseTestCase): if label in ready_nodes and len(ready_nodes[label]) == count: break self.wait_for_threads() - return ready_nodes[label] + return sorted(ready_nodes[label], + key=lambda x: x.id) def waitForAnyNodeInState(self, state): # Wait for a node to be in the aborted state diff --git a/nodepool/tests/unit/test_driver_static.py b/nodepool/tests/unit/test_driver_static.py index b5de5d44d..206cacb65 100644 --- a/nodepool/tests/unit/test_driver_static.py +++ b/nodepool/tests/unit/test_driver_static.py @@ -68,6 +68,7 @@ class TestDriverStatic(tests.DBTestCase): {'key1': 'value1', 'key2': 'value2'}) self.assertEqual(nodes[0].python_path, 'auto') self.assertIsNone(nodes[0].shell_type) + self.assertEqual(nodes[0].slot, 0) def test_static_python_path(self): ''' @@ -80,6 +81,7 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for node pre-registration") nodes = self.waitForNodes('fake-label') self.assertEqual(nodes[0].python_path, "/usr/bin/python3") + self.assertEqual(nodes[0].slot, 0) nodes[0].state = zk.USED self.zk.storeNode(nodes[0]) @@ -87,6 +89,7 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for node to be re-available") nodes = self.waitForNodes('fake-label') self.assertEqual(nodes[0].python_path, "/usr/bin/python3") + self.assertEqual(nodes[0].slot, 0) def test_static_multiname(self): ''' @@ -101,11 +104,13 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0].state, zk.READY) self.assertEqual(nodes[0].username, 'zuul') + self.assertEqual(nodes[0].slot, 0) nodes = self.waitForNodes('other-label', 1) self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0].state, zk.READY) self.assertEqual(nodes[0].username, 'zuul-2') + self.assertEqual(nodes[0].slot, 0) req = zk.NodeRequest() req.state = zk.REQUESTED @@ -150,6 +155,7 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(nodes[0].connection_port, 22022) self.assertEqual(nodes[0].connection_type, 'ssh') self.assertEqual(nodes[0].host_keys, ['ssh-rsa FAKEKEY']) + self.assertEqual(nodes[0].slot, 0) def test_static_node_increase(self): ''' @@ -162,11 +168,14 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for initial node") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].slot, 0) self.log.debug("Waiting for additional node") self.replace_config(configfile, 'static-2-nodes.yaml') nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 0) def test_static_node_decrease(self): ''' @@ -179,12 +188,15 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for initial nodes") nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 0) self.log.debug("Waiting for node decrease") self.replace_config(configfile, 'static-basic.yaml') nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0].hostname, 'fake-host-1') + self.assertEqual(nodes[0].slot, 0) def test_static_parallel_increase(self): ''' @@ -197,11 +209,14 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for initial node") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].slot, 0) self.log.debug("Waiting for additional node") self.replace_config(configfile, 'static-parallel-increase.yaml') nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 1) def test_static_parallel_decrease(self): ''' @@ -214,11 +229,14 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for initial nodes") nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 1) self.log.debug("Waiting for node decrease") self.replace_config(configfile, 'static-basic.yaml') nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].slot, 0) def test_static_node_update(self): ''' @@ -242,6 +260,7 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(nodes[0].connection_port, 5986) self.assertEqual(nodes[0].connection_type, 'winrm') self.assertEqual(nodes[0].host_keys, []) + self.assertEqual(nodes[0].slot, 0) def test_static_node_update_startup(self): ''' @@ -266,6 +285,7 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(nodes[0].id, "0000000000") self.assertIn('fake-label', nodes[0].type) self.assertIn('fake-label2', nodes[0].type) + self.assertEqual(nodes[0].slot, 0) def test_static_multilabel(self): configfile = self.setup_config('static-multilabel.yaml') @@ -274,23 +294,26 @@ class TestDriverStatic(tests.DBTestCase): nodes = self.waitForNodes('fake-label') self.assertIn('fake-label', nodes[0].type) self.assertIn('fake-label2', nodes[0].type) + self.assertEqual(nodes[0].slot, 0) def test_static_handler(self): configfile = self.setup_config('static.yaml') pool = self.useNodepool(configfile, watermark_sleep=1) pool.start() - node = self.waitForNodes('fake-label') + nodes = self.waitForNodes('fake-label') + self.assertEqual(nodes[0].slot, 0) self.waitForNodes('fake-concurrent-label', 2) - node = node[0] + node = nodes[0] self.log.debug("Marking first node as used %s", node.id) node.state = zk.USED self.zk.storeNode(node) self.waitForNodeDeletion(node) self.log.debug("Waiting for node to be re-available") - node = self.waitForNodes('fake-label') - self.assertEqual(len(node), 1) + nodes = self.waitForNodes('fake-label') + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].slot, 0) def test_static_waiting_handler(self): configfile = self.setup_config('static-2-nodes-multilabel.yaml') @@ -390,6 +413,7 @@ class TestDriverStatic(tests.DBTestCase): # Make sure the node is not reallocated node = self.zk.getNode(req.nodes[0]) self.assertIsNotNone(node) + self.assertEqual(node.slot, 0) def test_static_waiting_handler_order(self): configfile = self.setup_config('static-basic.yaml') @@ -402,6 +426,7 @@ class TestDriverStatic(tests.DBTestCase): self.zk.storeNodeRequest(req) req = self.waitForNodeRequest(req, zk.FULFILLED) node = self.zk.getNode(req.nodes[0]) + self.assertEqual(node.slot, 0) self.zk.lockNode(node) node.state = zk.USED self.zk.storeNode(node) @@ -431,6 +456,7 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(req_waiting3.state, zk.REQUESTED) node_waiting2 = self.zk.getNode(req_waiting2.nodes[0]) + self.assertEqual(node_waiting2.slot, 0) self.zk.lockNode(node_waiting2) node_waiting2.state = zk.USED self.zk.storeNode(node_waiting2) @@ -441,6 +467,7 @@ class TestDriverStatic(tests.DBTestCase): self.assertEqual(req_waiting1.state, zk.REQUESTED) node_waiting3 = self.zk.getNode(req_waiting3.nodes[0]) + self.assertEqual(node_waiting3.slot, 0) self.zk.lockNode(node_waiting3) node_waiting3.state = zk.USED self.zk.storeNode(node_waiting3) @@ -502,6 +529,7 @@ class TestDriverStatic(tests.DBTestCase): pool.start() nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].slot, 0) req = zk.NodeRequest() req.state = zk.REQUESTED @@ -522,6 +550,7 @@ class TestDriverStatic(tests.DBTestCase): new_nodes = self.waitForNodes('fake-label') self.assertEqual(len(new_nodes), 1) self.assertEqual(nodes[0].hostname, new_nodes[0].hostname) + self.assertEqual(nodes[0].slot, 0) def test_liveness_check(self): ''' @@ -642,9 +671,13 @@ class TestDriverStatic(tests.DBTestCase): self.log.debug("Waiting for initial nodes") nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 0) self.zk.deleteNode(nodes[0]) self.log.debug("Waiting for node to transition to ready again") nodes = self.waitForNodes('fake-label', 2) self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].slot, 0) + self.assertEqual(nodes[1].slot, 0) diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index b693d358a..866b1db99 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -520,6 +520,7 @@ class Node(BaseModel): self.host_keys = [] self.hold_expiration = None self.resources = None + self.slot = None self.attributes = None self.python_path = None self.tenant_name = None @@ -564,6 +565,7 @@ class Node(BaseModel): self.host_keys == other.host_keys and self.hold_expiration == other.hold_expiration and self.resources == other.resources and + self.slot == other.slot and self.attributes == other.attributes and self.python_path == other.python_path and self.tenant_name == other.tenant_name and @@ -618,6 +620,7 @@ class Node(BaseModel): d['shell_type'] = self.shell_type d['hold_expiration'] = self.hold_expiration d['resources'] = self.resources + d['slot'] = self.slot d['attributes'] = self.attributes d['python_path'] = self.python_path d['tenant_name'] = self.tenant_name @@ -686,6 +689,7 @@ class Node(BaseModel): else: self.hold_expiration = hold_expiration self.resources = d.get('resources') + self.slot = d.get('slot') self.attributes = d.get('attributes') self.python_path = d.get('python_path') self.shell_type = d.get('shell_type')