diff --git a/nodepool/launcher.py b/nodepool/launcher.py index 5b260a9e6..e1f65dd55 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -72,7 +72,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): self.pool_name = pool_name self.running = False self.stop_event = threading.Event() - self.paused_handler = None + self.paused_handlers = set() self.request_handlers = [] self.watermark_sleep = nodepool.watermark_sleep self.zk = self.getZK() @@ -219,7 +219,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): rh = pm.getRequestHandler(self, req) reasons_to_decline = rh.getDeclinedReasons() - if self.paused_handler and not reasons_to_decline: + if self.paused_handlers and not reasons_to_decline: self.log.debug("Handler is paused, deferring request") continue @@ -275,7 +275,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): label_quota[node_type] -= 1 if rh.paused: - self.paused_handler = rh + self.paused_handlers.add(rh) self.request_handlers.append(rh) # if we exceeded the timeout stop iterating here @@ -295,7 +295,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): if not r.poll(): active_handlers.append(r) if r.paused: - self.paused_handler = r + self.paused_handlers.add(r) else: log.debug("Removing request handler") except kze.SessionExpiredError: @@ -435,17 +435,19 @@ class PoolWorker(threading.Thread, stats.StatsReporter): self.updateTenantLimits( self.nodepool.config.tenant_resource_limits) - if not self.paused_handler: - self.component_info.paused = False - else: + if self.paused_handlers: self.component_info.paused = True - # If we are paused, one request handler could not + # If we are paused, some request handlers could not # satisfy its assigned request, so give it - # another shot. Unpause ourselves if it completed. - self.paused_handler.run() - if not self.paused_handler.paused: - self.paused_handler = None - self.component_info.paused = False + # another shot. Unpause ourselves if all are completed. + for rh in sorted(self.paused_handlers, + key=lambda h: h.request.priority): + rh.run() + if not rh.paused: + self.paused_handlers.remove(rh) + + if not self.paused_handlers: + self.component_info.paused = False # Regardless of whether we are paused, run # assignHandlers. It will only accept requests if we @@ -463,8 +465,9 @@ class PoolWorker(threading.Thread, stats.StatsReporter): self.stop_event.wait(self.watermark_sleep) # Cleanup on exit - if self.paused_handler: - self.paused_handler.unlockNodeSet(clear_allocation=True) + if self.paused_handlers: + for rh in self.paused_handlers: + rh.unlockNodeSet(clear_allocation=True) def stop(self): ''' diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 164ea419a..db8f51e89 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -261,7 +261,7 @@ class TestDriverAws(tests.DBTestCase): # second request. pool_worker = pool.getPoolWorkers('ec2-us-west-2') for _ in iterate_timeout(30, Exception, 'paused handler'): - if pool_worker[0].paused_handler: + if pool_worker[0].paused_handlers: break # Release the first node so that the second can be fulfilled. @@ -316,7 +316,7 @@ class TestDriverAws(tests.DBTestCase): # second request. pool_worker = pool.getPoolWorkers('ec2-us-west-2') for _ in iterate_timeout(30, Exception, 'paused handler'): - if pool_worker[0].paused_handler: + if pool_worker[0].paused_handlers: break # Release the first node so that the second can be fulfilled. diff --git a/nodepool/tests/unit/test_driver_kubernetes.py b/nodepool/tests/unit/test_driver_kubernetes.py index e26fb6949..bff9bb9d6 100644 --- a/nodepool/tests/unit/test_driver_kubernetes.py +++ b/nodepool/tests/unit/test_driver_kubernetes.py @@ -263,7 +263,7 @@ class TestDriverKubernetes(tests.DBTestCase): if pause: # The previous request should pause the handler pool_worker = pool.getPoolWorkers('kubespray') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) else: self.waitForNodeRequest(max_req, (zk.REQUESTED,)) diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 03bb93aee..a0ea800a2 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -235,7 +235,7 @@ class TestLauncher(tests.DBTestCase): # Wait until there is a paused request handler and check if there # are exactly two servers pool_worker = pool.getPoolWorkers('fake-provider') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) self.assertEqual(len(client._server_list), 2) @@ -512,7 +512,7 @@ class TestLauncher(tests.DBTestCase): self.zk.storeNodeRequest(req2) pool_worker = pool.getPoolWorkers('fake-provider') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) # The handler is paused now and the request should be in state PENDING @@ -2080,7 +2080,7 @@ class TestLauncher(tests.DBTestCase): # causing request handling to pause. self.waitForNodeRequest(req, (zk.PENDING,)) pool_worker = pool.getPoolWorkers('fake-provider') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) self.assertTrue(mock_invalidatequotacache.called) @@ -2218,11 +2218,12 @@ class TestLauncher(tests.DBTestCase): # Force an exception within the run handler. pool_worker = pool.getPoolWorkers('fake-provider') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) - pool_worker[0].paused_handler.hasProviderQuota = mock.Mock( - side_effect=Exception('mock exception') - ) + for rh in pool_worker[0].paused_handlers: + rh.hasProviderQuota = mock.Mock( + side_effect=Exception('mock exception') + ) # The above exception should cause us to fail the paused request. req2 = self.waitForNodeRequest(req2, (zk.FAILED,)) @@ -2230,7 +2231,7 @@ class TestLauncher(tests.DBTestCase): # The exception handling should make sure that we unpause AND remove # the request handler. - while pool_worker[0].paused_handler: + while pool_worker[0].paused_handlers: time.sleep(0.1) self.assertEqual(0, len(pool_worker[0].request_handlers)) @@ -2321,7 +2322,7 @@ class TestLauncher(tests.DBTestCase): self.zk.storeNodeRequest(req2) pool_worker = pool.getPoolWorkers('fake-provider') - while not pool_worker[0].paused_handler: + while not pool_worker[0].paused_handlers: time.sleep(0.1) # The handler is paused now and the request should be in state PENDING @@ -2413,6 +2414,60 @@ class TestLauncher(tests.DBTestCase): req = self.waitForNodeRequest(req) self.assertEqual(req.state, zk.FULFILLED) + def test_multiple_paused_requests(self): + """Test that multiple paused requests are fulfilled in order.""" + max_instances = 0 + + def fake_get_quota(): + nonlocal max_instances + return (100, max_instances, 1000000) + + self.useFixture(fixtures.MockPatchObject( + fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fake_get_quota + )) + + req1 = zk.NodeRequest() + req1.state = zk.REQUESTED + req1.node_types.append('fake-label') + self.zk.storeNodeRequest(req1) + + req2 = zk.NodeRequest() + req2.state = zk.REQUESTED + req2.node_types.append('fake-label') + self.zk.storeNodeRequest(req2) + + configfile = self.setup_config('ignore_provider_quota_true.yaml') + self.useBuilder(configfile) + self.waitForImage('fake-provider', 'fake-image') + + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + + self.waitForAnyNodeInState(zk.ABORTED) + + pool_worker = pool.getPoolWorkers('fake-provider') + while not len(pool_worker[0].paused_handlers) == 2: + time.sleep(0.1) + + # Bump up the quota to allow the provider to allocate a node + max_instances = 1 + req1 = self.waitForNodeRequest(req1) + self.assertEqual(req1.state, zk.FULFILLED) + + req2 = self.waitForNodeRequest(req2, zk.PENDING) + + # Release the node allocated to the first request + req1_node = self.zk.getNode(req1.nodes[0]) + self.zk.lockNode(req1_node, blocking=False) + + req1_node.state = zk.USED + self.zk.storeNode(req1_node) + self.zk.unlockNode(req1_node) + self.waitForNodeDeletion(req1_node) + + self.waitForNodeRequest(req2, zk.FULFILLED) + def test_request_order(self): """Test that requests are handled in sorted order""" configfile = self.setup_config('node_no_min_ready.yaml')