From 8fd774493515a609bb6c7d057ccc6831456236eb Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Thu, 12 Jan 2017 14:50:20 -0500 Subject: [PATCH] Add support for max_concurrency for providers Add the capability to limit the number of node requests being handled simultaneously for a provider. The default does not force any limit. Change-Id: I49a2638c8003614ab4dc287d157abe873da81421 --- doc/source/configuration.rst | 6 ++ nodepool/cmd/config_validator.py | 1 + nodepool/config.py | 2 + nodepool/nodepool.py | 77 +++++++++++++------ .../tests/fixtures/config_validate/good.yaml | 1 + 5 files changed, 63 insertions(+), 24 deletions(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 2fc12d07a..9fb6bbf78 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -438,6 +438,12 @@ provider, the Nodepool image types are also defined (see OpenStack project and will attempt to clean unattached floating ips that may have leaked around restarts. + ``max-concurrency`` + Maximum number of node requests that this provider is allowed to handle + concurrently. The default, if not specified, is to have no maximum. Since + each node request is handled by a separate thread, this can be useful for + limiting the number of threads used by the nodepoold daemon. + .. _images: images diff --git a/nodepool/cmd/config_validator.py b/nodepool/cmd/config_validator.py index 1b8b32f9f..dd3102f01 100644 --- a/nodepool/cmd/config_validator.py +++ b/nodepool/cmd/config_validator.py @@ -66,6 +66,7 @@ class ConfigValidator: 'project-id': str, 'project-name': str, 'max-servers': int, + 'max-concurrency': int, 'pool': str, 'image-type': str, 'networks': [v.Any(old_network, network)], diff --git a/nodepool/config.py b/nodepool/config.py index bb482225d..3db182275 100644 --- a/nodepool/config.py +++ b/nodepool/config.py @@ -50,6 +50,7 @@ class Provider(ConfigValue): other.networks != self.networks or other.ipv6_preferred != self.ipv6_preferred or other.clean_floating_ips != self.clean_floating_ips or + other.max_concurrency != self.max_concurrency or other.azs != self.azs): return False new_images = other.images @@ -174,6 +175,7 @@ def loadConfig(config_path): p.cloud_config = _get_one_cloud(cloud_config, cloud_kwargs) p.region_name = provider.get('region-name') p.max_servers = provider['max-servers'] + p.max_concurrency = provider.get('max-concurrency', -1) p.keypair = provider.get('keypair', None) p.pool = provider.get('pool', None) p.rate = provider.get('rate', 1.0) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 2c080c849..5a56b1ddd 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -728,6 +728,7 @@ class ProviderWorker(threading.Thread): self.zk = zk self.running = False self.configfile = configfile + self.workers = [] #---------------------------------------------------------------- # Private methods @@ -753,6 +754,54 @@ class ProviderWorker(threading.Thread): else: self.provider = config.providers[self.provider.name] + def _processRequests(self): + self.log.debug("Getting node request from ZK queue") + + for req_id in self.zk.getNodeRequests(): + # Short-circuit for limited request handling + if (self.provider.max_concurrency > 0 + and self._activeWorkers() >= self.provider.max_concurrency + ): + return + + req = self.zk.getNodeRequest(req_id) + if not req: + continue + + # Only interested in unhandled requests + if req.state != zk.REQUESTED: + continue + + try: + self.zk.lockNodeRequest(req, blocking=False) + except exceptions.ZKLockException: + continue + + # Make sure the state didn't change on us + if req.state != zk.REQUESTED: + self.zk.unlockNodeRequest(req) + continue + + # Got a lock, so assign it + self.log.info("Assigning node request %s" % req.id) + t = NodeRequestWorker(self.zk, req) + t.start() + self.workers.append(t) + + def _activeWorkers(self): + ''' + Return a count of the number of requests actively being handled. + + This serves the dual-purpose of also removing completed requests from + our list of tracked threads. + ''' + active = [] + for w in self.workers: + if w.isAlive(): + active.append(w) + self.workers = active + return len(self.workers) + #---------------------------------------------------------------- # Public methods #---------------------------------------------------------------- @@ -761,31 +810,11 @@ class ProviderWorker(threading.Thread): self.running = True while self.running: - self.log.debug("Getting node request from ZK queue") + if self.provider.max_concurrency == -1 and self.workers: + self.workers = [] - for req_id in self.zk.getNodeRequests(): - req = self.zk.getNodeRequest(req_id) - if not req: - continue - - # Only interested in unhandled requests - if req.state != zk.REQUESTED: - continue - - try: - self.zk.lockNodeRequest(req, blocking=False) - except exceptions.ZKLockException: - continue - - # Make sure the state didn't change on us - if req.state != zk.REQUESTED: - self.zk.unlockNodeRequest(req) - continue - - # Got a lock, so assign it - self.log.info("Assigning node request %s" % req.id) - t = NodeRequestWorker(self.zk, req) - t.start() + if self.provider.max_concurrency != 0: + self._processRequests() time.sleep(10) self._updateProvider() diff --git a/nodepool/tests/fixtures/config_validate/good.yaml b/nodepool/tests/fixtures/config_validate/good.yaml index 623a2f5ca..1ef7a67f8 100644 --- a/nodepool/tests/fixtures/config_validate/good.yaml +++ b/nodepool/tests/fixtures/config_validate/good.yaml @@ -38,6 +38,7 @@ providers: auth-url: 'https://identity.example.com/v2.0/' boot-timeout: 120 max-servers: 184 + max-concurrency: 10 rate: 0.001 images: - name: trusty