diff --git a/doc/source/aws.rst b/doc/source/aws.rst index c93b78d9f..d2f611c81 100644 --- a/doc/source/aws.rst +++ b/doc/source/aws.rst @@ -87,6 +87,12 @@ Selecting the ``aws`` driver adds the following options to the See `Boto Configuration`_ for more information. + .. attr:: rate + :type: float + :default: 2.0 + + The number of operations per second to perform against the provider. + .. attr:: boot-timeout :type: int seconds :default: 180 diff --git a/doc/source/azure.rst b/doc/source/azure.rst index 3c0547294..20702cabc 100644 --- a/doc/source/azure.rst +++ b/doc/source/azure.rst @@ -167,10 +167,10 @@ section of the configuration. platforms. The default value is true. .. attr:: rate - :type: float seconds + :type: float :default: 1.0 - In seconds, amount to wait between operations on the provider. + The number of operations per second to perform against the provider. .. attr:: boot-timeout :type: int seconds diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 87b450df8..1110658f1 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -13,19 +13,22 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent.futures import ThreadPoolExecutor +import cachetools.func import json import logging import math -import cachetools.func -import urllib.parse -import time import re - -import boto3 +import threading +import queue +import time +import urllib.parse from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver import statemachine +import boto3 + def tag_dict_to_list(tagdict): # TODO: validate tag values are strings in config and deprecate @@ -71,9 +74,6 @@ class AwsResource(statemachine.Resource): class AwsDeleteStateMachine(statemachine.StateMachine): VM_DELETING = 'deleting vm' - NIC_DELETING = 'deleting nic' - PIP_DELETING = 'deleting pip' - DISK_DELETING = 'deleting disk' COMPLETE = 'complete' def __init__(self, adapter, external_id, log): @@ -98,6 +98,7 @@ class AwsDeleteStateMachine(statemachine.StateMachine): class AwsCreateStateMachine(statemachine.StateMachine): + INSTANCE_CREATING_SUBMIT = 'submit creating instance' INSTANCE_CREATING = 'creating instance' INSTANCE_RETRY = 'retrying instance creation' COMPLETE = 'complete' @@ -124,10 +125,16 @@ class AwsCreateStateMachine(statemachine.StateMachine): def advance(self): if self.state == self.START: self.external_id = self.hostname - - self.instance = self.adapter._createInstance( + self.create_future = self.adapter._submitCreateInstance( self.label, self.image_external_id, self.tags, self.hostname, self.log) + self.state = self.INSTANCE_CREATING_SUBMIT + + if self.state == self.INSTANCE_CREATING_SUBMIT: + instance = self.adapter._completeCreateInstance(self.create_future) + if instance is None: + return + self.instance = instance self.quota = self.adapter._getQuotaForInstanceType( self.instance.instance_type) self.state = self.INSTANCE_CREATING @@ -142,7 +149,7 @@ class AwsCreateStateMachine(statemachine.StateMachine): raise Exception("Too many retries") self.attempts += 1 self.instance = self.adapter._deleteInstance( - self.external_id, self.log) + self.external_id, self.log, immediate=True) self.state = self.INSTANCE_RETRY else: return @@ -165,7 +172,39 @@ class AwsAdapter(statemachine.Adapter): self.log = logging.getLogger( f"nodepool.AwsAdapter.{provider_config.name}") self.provider = provider_config - # The standard rate limit, this might be 1 request per second + self._running = True + + # AWS has a default rate limit for creating instances that + # works out to a sustained 2 instances/sec, but the actual + # create instance API call takes 1 second or more. If we want + # to achieve faster than 1 instance/second throughput, we need + # to parallelize create instance calls, so we set up a + # threadworker to do that. + + # A little bit of a heuristic here to set the worker count. + # It appears that AWS typically takes 1-1.5 seconds to execute + # a create API call. Figure out how many we have to do in + # parallel in order to run at the rate limit, then quadruple + # that for headroom. Max out at 8 so we don't end up with too + # many threads. In practice, this will be 8 with the default + # values, and only less if users slow down the rate. + workers = max(min(int(self.provider.rate * 4), 8), 1) + self.log.info("Create executor with max workers=%s", workers) + self.create_executor = ThreadPoolExecutor(max_workers=workers) + + # We can batch delete instances using the AWS API, so to do + # that, create a queue for deletes, and a thread to process + # the queue. It will be greedy and collect as many pending + # instance deletes as possible to delete together. Typically + # under load, that will mean a single instance delete followed + # by larger batches. That strikes a balance between + # responsiveness and efficiency. Reducing the overall number + # of requests leaves more time for create instance calls. + self.delete_queue = queue.Queue() + self.delete_thread = threading.Thread(target=self._deleteThread) + self.delete_thread.daemon = True + self.delete_thread.start() + self.rate_limiter = RateLimiter(self.provider.name, self.provider.rate) # Non mutating requests can be made more often at 10x the rate @@ -190,6 +229,10 @@ class AwsAdapter(statemachine.Adapter): self.not_our_images = set() self.not_our_snapshots = set() + def stop(self): + self.create_executor.shutdown() + self._running = False + def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, log): return AwsCreateStateMachine(self, hostname, label, @@ -232,7 +275,7 @@ class AwsAdapter(statemachine.Adapter): def deleteResource(self, resource): self.log.info(f"Deleting leaked {resource.type}: {resource.id}") if resource.type == 'instance': - self._deleteInstance(resource.id) + self._deleteInstance(resource.id, immediate=True) if resource.type == 'volume': self._deleteVolume(resource.id) if resource.type == 'ami': @@ -519,6 +562,18 @@ class AwsAdapter(statemachine.Adapter): with self.non_mutating_rate_limiter: return self.ec2.Image(image_id) + def _submitCreateInstance(self, label, image_external_id, + tags, hostname, log): + return self.create_executor.submit( + self._createInstance, + label, image_external_id, + tags, hostname, log) + + def _completeCreateInstance(self, future): + if not future.done(): + return None + return future.result() + def _createInstance(self, label, image_external_id, tags, hostname, log): if image_external_id: @@ -600,7 +655,38 @@ class AwsAdapter(statemachine.Adapter): log.debug(f"Created VM {hostname} as instance {instances[0].id}") return instances[0] - def _deleteInstance(self, external_id, log=None): + def _deleteThread(self): + while self._running: + try: + self._deleteThreadInner() + except Exception: + self.log.exception("Error in delete thread:") + time.sleep(5) + + def _deleteThreadInner(self): + records = [] + try: + records.append(self.delete_queue.get(block=True, timeout=10)) + except queue.Empty: + return + while True: + try: + records.append(self.delete_queue.get(block=False)) + except queue.Empty: + break + # The terminate call has a limit of 1k, but AWS recommends + # smaller batches. We limit to 50 here. + if len(records) >= 50: + break + ids = [] + for (del_id, log) in records: + ids.append(del_id) + log.debug(f"Deleting instance {del_id}") + count = len(ids) + with self.rate_limiter(log.debug, f"Deleted {count} instances"): + self.ec2_client.terminate_instances(InstanceIds=ids) + + def _deleteInstance(self, external_id, log=None, immediate=False): if log is None: log = self.log for instance in self._listInstances(): @@ -609,9 +695,12 @@ class AwsAdapter(statemachine.Adapter): else: log.warning(f"Instance not found when deleting {external_id}") return None - with self.rate_limiter(log.debug, "Deleted instance"): - log.debug(f"Deleting instance {external_id}") - instance.terminate() + if immediate: + with self.rate_limiter(log.debug, "Deleted instance"): + log.debug(f"Deleting instance {external_id}") + instance.terminate() + else: + self.delete_queue.put((external_id, log)) return instance def _deleteVolume(self, external_id): diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index 9b4592d65..71ad5a246 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -251,7 +251,7 @@ class AwsProviderConfig(ProviderConfig): self.profile_name = self.provider.get('profile-name') self.region_name = self.provider.get('region-name') - self.rate = self.provider.get('rate', 1) + self.rate = self.provider.get('rate', 2) self.launch_retries = self.provider.get('launch-retries', 3) self.launch_timeout = self.provider.get('launch-timeout', 3600) self.boot_timeout = self.provider.get('boot-timeout', 180) diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index 6bb3fcdc3..33e747ce0 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -493,6 +493,7 @@ class StateMachineProvider(Provider, QuotaSupport): self.running = False if self.keyscan_worker: self.keyscan_worker.shutdown() + self.adapter.stop() self.log.debug("Stopped") def join(self): @@ -821,6 +822,10 @@ class Adapter: def __init__(self, provider_config): pass + def stop(self): + """Release any resources as this provider is being stopped""" + pass + def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, log): diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index 60cd85866..dea047223 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -430,8 +430,12 @@ class RateLimiter: def __init__(self, name, rate_limit): self._running = True self.name = name - self.delta = 1.0 / rate_limit + if not rate_limit: + self.delta = 0.0 + else: + self.delta = 1.0 / rate_limit self.last_ts = None + self.lock = threading.Lock() def __call__(self, logmethod, msg): return RateLimitInstance(self, logmethod, msg) @@ -440,19 +444,21 @@ class RateLimiter: self._enter() def _enter(self): - total_delay = 0.0 - if self.last_ts is None: + with self.lock: + total_delay = 0.0 + if self.last_ts is None: + self.last_ts = time.monotonic() + return total_delay + while True: + now = time.monotonic() + delta = now - self.last_ts + if delta >= self.delta: + break + delay = self.delta - delta + time.sleep(delay) + total_delay += delay + self.last_ts = time.monotonic() return total_delay - while True: - now = time.monotonic() - delta = now - self.last_ts - if delta >= self.delta: - break - delay = self.delta - delta - time.sleep(delay) - total_delay += delay - self.last_ts = time.monotonic() - return total_delay def __exit__(self, etype, value, tb): self._exit(etype, value, tb) diff --git a/nodepool/tests/fixtures/aws/aws-multiple.yaml b/nodepool/tests/fixtures/aws/aws-multiple.yaml new file mode 100644 index 000000000..0fdba177b --- /dev/null +++ b/nodepool/tests/fixtures/aws/aws-multiple.yaml @@ -0,0 +1,38 @@ +zookeeper-servers: + - host: {zookeeper_host} + port: {zookeeper_port} + chroot: {zookeeper_chroot} + +zookeeper-tls: + ca: {zookeeper_ca} + cert: {zookeeper_cert} + key: {zookeeper_key} + +tenant-resource-limits: + - tenant-name: tenant-1 + max-cores: 1024 + +labels: + - name: ubuntu1404 + +providers: + - name: ec2-us-west-2 + driver: aws + region-name: us-west-2 + cloud-images: + - name: ubuntu1404 + image-id: ami-1e749f67 + username: ubuntu + pools: + - name: main + max-servers: 10 + subnet-id: {subnet_id} + security-group-id: {security_group_id} + node-attributes: + key1: value1 + key2: value2 + labels: + - name: ubuntu1404 + cloud-image: ubuntu1404 + instance-type: t3.medium + key-name: zuul diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 209eb674f..3290d78ac 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -176,6 +176,34 @@ class TestDriverAws(tests.DBTestCase): {'key1': 'value1', 'key2': 'value2'}) return node + def test_aws_multiple(self): + # Test creating multiple instances at once. This is most + # useful to run manually during development to observe + # behavior. + configfile = self.setup_config('aws/aws-multiple.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + self.patchProvider(pool) + + reqs = [] + for x in range(4): + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('ubuntu1404') + self.zk.storeNodeRequest(req) + reqs.append(req) + + nodes = [] + for req in reqs: + self.log.debug("Waiting for request %s", req.id) + req = self.waitForNodeRequest(req) + nodes.append(self.assertSuccess(req)) + for node in nodes: + node.state = zk.USED + self.zk.storeNode(node) + for node in nodes: + self.waitForNodeDeletion(node) + def test_aws_node(self): req = self.requestNode('aws/aws.yaml', 'ubuntu1404') node = self.assertSuccess(req)