Add support for max_concurrency for providers

Add the capability to limit the number of node requests being handled
simultaneously for a provider. The default does not force any limit.

Change-Id: I49a2638c8003614ab4dc287d157abe873da81421
This commit is contained in:
David Shrewsbury 2017-01-12 14:50:20 -05:00
parent 22c35e0756
commit 8fd7744935
5 changed files with 63 additions and 24 deletions

View File

@ -438,6 +438,12 @@ provider, the Nodepool image types are also defined (see
OpenStack project and will attempt to clean unattached floating ips that
may have leaked around restarts.
``max-concurrency``
Maximum number of node requests that this provider is allowed to handle
concurrently. The default, if not specified, is to have no maximum. Since
each node request is handled by a separate thread, this can be useful for
limiting the number of threads used by the nodepoold daemon.
.. _images:
images

View File

@ -66,6 +66,7 @@ class ConfigValidator:
'project-id': str,
'project-name': str,
'max-servers': int,
'max-concurrency': int,
'pool': str,
'image-type': str,
'networks': [v.Any(old_network, network)],

View File

@ -50,6 +50,7 @@ class Provider(ConfigValue):
other.networks != self.networks or
other.ipv6_preferred != self.ipv6_preferred or
other.clean_floating_ips != self.clean_floating_ips or
other.max_concurrency != self.max_concurrency or
other.azs != self.azs):
return False
new_images = other.images
@ -174,6 +175,7 @@ def loadConfig(config_path):
p.cloud_config = _get_one_cloud(cloud_config, cloud_kwargs)
p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers']
p.max_concurrency = provider.get('max-concurrency', -1)
p.keypair = provider.get('keypair', None)
p.pool = provider.get('pool', None)
p.rate = provider.get('rate', 1.0)

View File

@ -728,6 +728,7 @@ class ProviderWorker(threading.Thread):
self.zk = zk
self.running = False
self.configfile = configfile
self.workers = []
#----------------------------------------------------------------
# Private methods
@ -753,6 +754,54 @@ class ProviderWorker(threading.Thread):
else:
self.provider = config.providers[self.provider.name]
def _processRequests(self):
self.log.debug("Getting node request from ZK queue")
for req_id in self.zk.getNodeRequests():
# Short-circuit for limited request handling
if (self.provider.max_concurrency > 0
and self._activeWorkers() >= self.provider.max_concurrency
):
return
req = self.zk.getNodeRequest(req_id)
if not req:
continue
# Only interested in unhandled requests
if req.state != zk.REQUESTED:
continue
try:
self.zk.lockNodeRequest(req, blocking=False)
except exceptions.ZKLockException:
continue
# Make sure the state didn't change on us
if req.state != zk.REQUESTED:
self.zk.unlockNodeRequest(req)
continue
# Got a lock, so assign it
self.log.info("Assigning node request %s" % req.id)
t = NodeRequestWorker(self.zk, req)
t.start()
self.workers.append(t)
def _activeWorkers(self):
'''
Return a count of the number of requests actively being handled.
This serves the dual-purpose of also removing completed requests from
our list of tracked threads.
'''
active = []
for w in self.workers:
if w.isAlive():
active.append(w)
self.workers = active
return len(self.workers)
#----------------------------------------------------------------
# Public methods
#----------------------------------------------------------------
@ -761,31 +810,11 @@ class ProviderWorker(threading.Thread):
self.running = True
while self.running:
self.log.debug("Getting node request from ZK queue")
if self.provider.max_concurrency == -1 and self.workers:
self.workers = []
for req_id in self.zk.getNodeRequests():
req = self.zk.getNodeRequest(req_id)
if not req:
continue
# Only interested in unhandled requests
if req.state != zk.REQUESTED:
continue
try:
self.zk.lockNodeRequest(req, blocking=False)
except exceptions.ZKLockException:
continue
# Make sure the state didn't change on us
if req.state != zk.REQUESTED:
self.zk.unlockNodeRequest(req)
continue
# Got a lock, so assign it
self.log.info("Assigning node request %s" % req.id)
t = NodeRequestWorker(self.zk, req)
t.start()
if self.provider.max_concurrency != 0:
self._processRequests()
time.sleep(10)
self._updateProvider()

View File

@ -38,6 +38,7 @@ providers:
auth-url: 'https://identity.example.com/v2.0/'
boot-timeout: 120
max-servers: 184
max-concurrency: 10
rate: 0.001
images:
- name: trusty