Prioritize requests with labels that can be served
We noticed that a provider sometimes takes very long to serve a request because it is busy declining requests with a higher priority. This is made worse by the fact that we stop processing requests to remove completed handlers every 15s. Afterwards we will start over and also include new requests. This means, that as long as there are requests with a higher priority we will process them first even if they need to be declined because the pool doesn't provide the requested label(s). To improve this we will first sort the open node requests based on whether they can be fulfilled by the current provider and only then by priority. This has the added benefit that a provider might not even need to decline a request if it could be fulfilled by another provider in the meantime. Change-Id: Id04f57ff1e4c28357370729b6383f5119cd616dc
This commit is contained in:
parent
3bdd185904
commit
ed445367f4
@ -16,7 +16,6 @@
|
||||
|
||||
import logging
|
||||
import math
|
||||
import operator
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
@ -110,8 +109,18 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
# which express a preference for a specific provider.
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
|
||||
requests = sorted(self.zk.nodeRequestIterator(),
|
||||
key=operator.attrgetter("priority"))
|
||||
pool = self.getPoolConfig()
|
||||
pool_labels = set(pool.labels)
|
||||
|
||||
def _sort_key(request):
|
||||
missing_labels = set(request.node_types) - pool_labels
|
||||
# Return a tuple with the number of labels that are *not* served by
|
||||
# this pool and the request priority. This will sort the requests
|
||||
# that we can serve (no missing labels) before the requests that we
|
||||
# need to decline (missing labels > 0).
|
||||
return len(missing_labels), request.priority
|
||||
|
||||
requests = sorted(self.zk.nodeRequestIterator(), key=_sort_key)
|
||||
for req in requests:
|
||||
if not self.running:
|
||||
return True
|
||||
|
@ -2276,6 +2276,56 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.assertTrue(req2.id > req1.id)
|
||||
self.assertTrue(req2.state_time < req1.state_time)
|
||||
|
||||
def test_request_order_missing_label(self):
|
||||
"""Test that requests that can be be fulfilled are prioritized over
|
||||
requests that need to be rejected (based on the request labels).
|
||||
"""
|
||||
configfile = self.setup_config('node_no_min_ready.yaml')
|
||||
self.useBuilder(configfile)
|
||||
image = self.waitForImage('fake-provider', 'fake-image')
|
||||
self.assertEqual(image.username, 'fake-username')
|
||||
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
self.wait_for_config(pool)
|
||||
|
||||
# Get the list of request handlers to assert the correct processing
|
||||
# order. The timestamps of the requests won't work for that as the
|
||||
# request to be rejected will be handled faster most of the time.
|
||||
while True:
|
||||
workers = pool.getPoolWorkers('fake-provider')
|
||||
if workers:
|
||||
pool_worker = workers[0]
|
||||
break
|
||||
time.sleep(0.1)
|
||||
request_handlers = pool_worker.request_handlers
|
||||
|
||||
# Request with a higher relative priority coming in first, but
|
||||
# requesting a label that is not available.
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
req1.node_types.append('not-available')
|
||||
req1.relative_priority = 1
|
||||
self.zk.storeNodeRequest(req1)
|
||||
|
||||
# Later request with a lower relative priority, but a valid label
|
||||
# which should be handled first.
|
||||
req2 = zk.NodeRequest()
|
||||
req2.state = zk.REQUESTED
|
||||
req2.node_types.append('fake-label')
|
||||
req2.relative_priority = 2
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
req2 = self.waitForNodeRequest(req2)
|
||||
self.assertEqual(req2.state, zk.FULFILLED)
|
||||
req1 = self.waitForNodeRequest(req1)
|
||||
self.assertEqual(req1.state, zk.FAILED)
|
||||
|
||||
self.assertGreater(req2.id, req1.id)
|
||||
self.assertEqual(len(request_handlers), 2)
|
||||
self.assertEqual(request_handlers[0].request.id, req2.id)
|
||||
self.assertEqual(request_handlers[1].request.id, req1.id)
|
||||
|
||||
def test_empty_node_deleted(self):
|
||||
"""Test that empty nodes are deleted by the cleanup thread"""
|
||||
configfile = self.setup_config('node.yaml')
|
||||
|
Loading…
x
Reference in New Issue
Block a user