AWS driver create/delete improvements

The default AWS rate limit is 2 instances/sec, but in practice, we
can achieve something like 0.6 instances/sec with the current code.
That's because the create instance REST API call itself takes more
than a second to return.  To achieve even the default AWS rate
(much less a potentially faster one which may be obtainable via
support request), we need to alter the approach.  This change does
the following:

* Paralellizes create API calls.  We create a threadpool with
  (typically) 8 workers to execute create instance calls in the
  background.  2 or 3 workers should be sufficient to meet the
  2/sec rate, more allows for the occasional longer execution time
  as well as a customized higher rate.  We max out at 8 to protect
  nodepool from too many threads.
* The state machine uses the new background create calls instead
  of synchronously creating instances.  This allows other state
  machines to progress further (ie, advance to ssh keyscan faster
  in the case of a rush of requests).
* Delete calls are batched.  They don't take as long as create calls,
  yet their existence at all uses up rate limiting slots which could
  be used for creating instances.  By batching deletes, we make
  more room for creates.
* A bug in the RateLimiter could cause it not to record the initial
  time and therefore avoid actually rate limiting.  This is fixed.
* The RateLimiter is now thread-safe.
* The default rate limit for AWS is changed to 2 requests/sec.
* Documentation for the 'rate' parameter for the AWS driver is added.
* Documentation for the 'rate' parameter for the Azure driver is
  corrected to describe the rate as requests/sec instead of delay
  between requests.

Change-Id: Ida2cbc59928e183eb7da275ff26d152eae784cfe
This commit is contained in:
James E. Blair 2022-06-17 09:55:25 -07:00
parent 1b1eab77b0
commit d5b0dee642
8 changed files with 205 additions and 33 deletions

View File

@ -87,6 +87,12 @@ Selecting the ``aws`` driver adds the following options to the
See `Boto Configuration`_ for more information. See `Boto Configuration`_ for more information.
.. attr:: rate
:type: float
:default: 2.0
The number of operations per second to perform against the provider.
.. attr:: boot-timeout .. attr:: boot-timeout
:type: int seconds :type: int seconds
:default: 180 :default: 180

View File

@ -167,10 +167,10 @@ section of the configuration.
platforms. The default value is true. platforms. The default value is true.
.. attr:: rate .. attr:: rate
:type: float seconds :type: float
:default: 1.0 :default: 1.0
In seconds, amount to wait between operations on the provider. The number of operations per second to perform against the provider.
.. attr:: boot-timeout .. attr:: boot-timeout
:type: int seconds :type: int seconds

View File

@ -13,19 +13,22 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from concurrent.futures import ThreadPoolExecutor
import cachetools.func
import json import json
import logging import logging
import math import math
import cachetools.func
import urllib.parse
import time
import re import re
import threading
import boto3 import queue
import time
import urllib.parse
from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver import statemachine from nodepool.driver import statemachine
import boto3
def tag_dict_to_list(tagdict): def tag_dict_to_list(tagdict):
# TODO: validate tag values are strings in config and deprecate # TODO: validate tag values are strings in config and deprecate
@ -71,9 +74,6 @@ class AwsResource(statemachine.Resource):
class AwsDeleteStateMachine(statemachine.StateMachine): class AwsDeleteStateMachine(statemachine.StateMachine):
VM_DELETING = 'deleting vm' VM_DELETING = 'deleting vm'
NIC_DELETING = 'deleting nic'
PIP_DELETING = 'deleting pip'
DISK_DELETING = 'deleting disk'
COMPLETE = 'complete' COMPLETE = 'complete'
def __init__(self, adapter, external_id, log): def __init__(self, adapter, external_id, log):
@ -98,6 +98,7 @@ class AwsDeleteStateMachine(statemachine.StateMachine):
class AwsCreateStateMachine(statemachine.StateMachine): class AwsCreateStateMachine(statemachine.StateMachine):
INSTANCE_CREATING_SUBMIT = 'submit creating instance'
INSTANCE_CREATING = 'creating instance' INSTANCE_CREATING = 'creating instance'
INSTANCE_RETRY = 'retrying instance creation' INSTANCE_RETRY = 'retrying instance creation'
COMPLETE = 'complete' COMPLETE = 'complete'
@ -124,10 +125,16 @@ class AwsCreateStateMachine(statemachine.StateMachine):
def advance(self): def advance(self):
if self.state == self.START: if self.state == self.START:
self.external_id = self.hostname self.external_id = self.hostname
self.create_future = self.adapter._submitCreateInstance(
self.instance = self.adapter._createInstance(
self.label, self.image_external_id, self.label, self.image_external_id,
self.tags, self.hostname, self.log) self.tags, self.hostname, self.log)
self.state = self.INSTANCE_CREATING_SUBMIT
if self.state == self.INSTANCE_CREATING_SUBMIT:
instance = self.adapter._completeCreateInstance(self.create_future)
if instance is None:
return
self.instance = instance
self.quota = self.adapter._getQuotaForInstanceType( self.quota = self.adapter._getQuotaForInstanceType(
self.instance.instance_type) self.instance.instance_type)
self.state = self.INSTANCE_CREATING self.state = self.INSTANCE_CREATING
@ -142,7 +149,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
raise Exception("Too many retries") raise Exception("Too many retries")
self.attempts += 1 self.attempts += 1
self.instance = self.adapter._deleteInstance( self.instance = self.adapter._deleteInstance(
self.external_id, self.log) self.external_id, self.log, immediate=True)
self.state = self.INSTANCE_RETRY self.state = self.INSTANCE_RETRY
else: else:
return return
@ -165,7 +172,39 @@ class AwsAdapter(statemachine.Adapter):
self.log = logging.getLogger( self.log = logging.getLogger(
f"nodepool.AwsAdapter.{provider_config.name}") f"nodepool.AwsAdapter.{provider_config.name}")
self.provider = provider_config self.provider = provider_config
# The standard rate limit, this might be 1 request per second self._running = True
# AWS has a default rate limit for creating instances that
# works out to a sustained 2 instances/sec, but the actual
# create instance API call takes 1 second or more. If we want
# to achieve faster than 1 instance/second throughput, we need
# to parallelize create instance calls, so we set up a
# threadworker to do that.
# A little bit of a heuristic here to set the worker count.
# It appears that AWS typically takes 1-1.5 seconds to execute
# a create API call. Figure out how many we have to do in
# parallel in order to run at the rate limit, then quadruple
# that for headroom. Max out at 8 so we don't end up with too
# many threads. In practice, this will be 8 with the default
# values, and only less if users slow down the rate.
workers = max(min(int(self.provider.rate * 4), 8), 1)
self.log.info("Create executor with max workers=%s", workers)
self.create_executor = ThreadPoolExecutor(max_workers=workers)
# We can batch delete instances using the AWS API, so to do
# that, create a queue for deletes, and a thread to process
# the queue. It will be greedy and collect as many pending
# instance deletes as possible to delete together. Typically
# under load, that will mean a single instance delete followed
# by larger batches. That strikes a balance between
# responsiveness and efficiency. Reducing the overall number
# of requests leaves more time for create instance calls.
self.delete_queue = queue.Queue()
self.delete_thread = threading.Thread(target=self._deleteThread)
self.delete_thread.daemon = True
self.delete_thread.start()
self.rate_limiter = RateLimiter(self.provider.name, self.rate_limiter = RateLimiter(self.provider.name,
self.provider.rate) self.provider.rate)
# Non mutating requests can be made more often at 10x the rate # Non mutating requests can be made more often at 10x the rate
@ -190,6 +229,10 @@ class AwsAdapter(statemachine.Adapter):
self.not_our_images = set() self.not_our_images = set()
self.not_our_snapshots = set() self.not_our_snapshots = set()
def stop(self):
self.create_executor.shutdown()
self._running = False
def getCreateStateMachine(self, hostname, label, def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries, log): image_external_id, metadata, retries, log):
return AwsCreateStateMachine(self, hostname, label, return AwsCreateStateMachine(self, hostname, label,
@ -232,7 +275,7 @@ class AwsAdapter(statemachine.Adapter):
def deleteResource(self, resource): def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}") self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'instance': if resource.type == 'instance':
self._deleteInstance(resource.id) self._deleteInstance(resource.id, immediate=True)
if resource.type == 'volume': if resource.type == 'volume':
self._deleteVolume(resource.id) self._deleteVolume(resource.id)
if resource.type == 'ami': if resource.type == 'ami':
@ -519,6 +562,18 @@ class AwsAdapter(statemachine.Adapter):
with self.non_mutating_rate_limiter: with self.non_mutating_rate_limiter:
return self.ec2.Image(image_id) return self.ec2.Image(image_id)
def _submitCreateInstance(self, label, image_external_id,
tags, hostname, log):
return self.create_executor.submit(
self._createInstance,
label, image_external_id,
tags, hostname, log)
def _completeCreateInstance(self, future):
if not future.done():
return None
return future.result()
def _createInstance(self, label, image_external_id, def _createInstance(self, label, image_external_id,
tags, hostname, log): tags, hostname, log):
if image_external_id: if image_external_id:
@ -600,7 +655,38 @@ class AwsAdapter(statemachine.Adapter):
log.debug(f"Created VM {hostname} as instance {instances[0].id}") log.debug(f"Created VM {hostname} as instance {instances[0].id}")
return instances[0] return instances[0]
def _deleteInstance(self, external_id, log=None): def _deleteThread(self):
while self._running:
try:
self._deleteThreadInner()
except Exception:
self.log.exception("Error in delete thread:")
time.sleep(5)
def _deleteThreadInner(self):
records = []
try:
records.append(self.delete_queue.get(block=True, timeout=10))
except queue.Empty:
return
while True:
try:
records.append(self.delete_queue.get(block=False))
except queue.Empty:
break
# The terminate call has a limit of 1k, but AWS recommends
# smaller batches. We limit to 50 here.
if len(records) >= 50:
break
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
def _deleteInstance(self, external_id, log=None, immediate=False):
if log is None: if log is None:
log = self.log log = self.log
for instance in self._listInstances(): for instance in self._listInstances():
@ -609,9 +695,12 @@ class AwsAdapter(statemachine.Adapter):
else: else:
log.warning(f"Instance not found when deleting {external_id}") log.warning(f"Instance not found when deleting {external_id}")
return None return None
with self.rate_limiter(log.debug, "Deleted instance"): if immediate:
log.debug(f"Deleting instance {external_id}") with self.rate_limiter(log.debug, "Deleted instance"):
instance.terminate() log.debug(f"Deleting instance {external_id}")
instance.terminate()
else:
self.delete_queue.put((external_id, log))
return instance return instance
def _deleteVolume(self, external_id): def _deleteVolume(self, external_id):

View File

@ -251,7 +251,7 @@ class AwsProviderConfig(ProviderConfig):
self.profile_name = self.provider.get('profile-name') self.profile_name = self.provider.get('profile-name')
self.region_name = self.provider.get('region-name') self.region_name = self.provider.get('region-name')
self.rate = self.provider.get('rate', 1) self.rate = self.provider.get('rate', 2)
self.launch_retries = self.provider.get('launch-retries', 3) self.launch_retries = self.provider.get('launch-retries', 3)
self.launch_timeout = self.provider.get('launch-timeout', 3600) self.launch_timeout = self.provider.get('launch-timeout', 3600)
self.boot_timeout = self.provider.get('boot-timeout', 180) self.boot_timeout = self.provider.get('boot-timeout', 180)

View File

@ -493,6 +493,7 @@ class StateMachineProvider(Provider, QuotaSupport):
self.running = False self.running = False
if self.keyscan_worker: if self.keyscan_worker:
self.keyscan_worker.shutdown() self.keyscan_worker.shutdown()
self.adapter.stop()
self.log.debug("Stopped") self.log.debug("Stopped")
def join(self): def join(self):
@ -821,6 +822,10 @@ class Adapter:
def __init__(self, provider_config): def __init__(self, provider_config):
pass pass
def stop(self):
"""Release any resources as this provider is being stopped"""
pass
def getCreateStateMachine(self, hostname, label, def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries, image_external_id, metadata, retries,
log): log):

View File

@ -430,8 +430,12 @@ class RateLimiter:
def __init__(self, name, rate_limit): def __init__(self, name, rate_limit):
self._running = True self._running = True
self.name = name self.name = name
self.delta = 1.0 / rate_limit if not rate_limit:
self.delta = 0.0
else:
self.delta = 1.0 / rate_limit
self.last_ts = None self.last_ts = None
self.lock = threading.Lock()
def __call__(self, logmethod, msg): def __call__(self, logmethod, msg):
return RateLimitInstance(self, logmethod, msg) return RateLimitInstance(self, logmethod, msg)
@ -440,19 +444,21 @@ class RateLimiter:
self._enter() self._enter()
def _enter(self): def _enter(self):
total_delay = 0.0 with self.lock:
if self.last_ts is None: total_delay = 0.0
if self.last_ts is None:
self.last_ts = time.monotonic()
return total_delay
while True:
now = time.monotonic()
delta = now - self.last_ts
if delta >= self.delta:
break
delay = self.delta - delta
time.sleep(delay)
total_delay += delay
self.last_ts = time.monotonic()
return total_delay return total_delay
while True:
now = time.monotonic()
delta = now - self.last_ts
if delta >= self.delta:
break
delay = self.delta - delta
time.sleep(delay)
total_delay += delay
self.last_ts = time.monotonic()
return total_delay
def __exit__(self, etype, value, tb): def __exit__(self, etype, value, tb):
self._exit(etype, value, tb) self._exit(etype, value, tb)

View File

@ -0,0 +1,38 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: ubuntu1404
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404
cloud-image: ubuntu1404
instance-type: t3.medium
key-name: zuul

View File

@ -176,6 +176,34 @@ class TestDriverAws(tests.DBTestCase):
{'key1': 'value1', 'key2': 'value2'}) {'key1': 'value1', 'key2': 'value2'})
return node return node
def test_aws_multiple(self):
# Test creating multiple instances at once. This is most
# useful to run manually during development to observe
# behavior.
configfile = self.setup_config('aws/aws-multiple.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.patchProvider(pool)
reqs = []
for x in range(4):
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req)
reqs.append(req)
nodes = []
for req in reqs:
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
nodes.append(self.assertSuccess(req))
for node in nodes:
node.state = zk.USED
self.zk.storeNode(node)
for node in nodes:
self.waitForNodeDeletion(node)
def test_aws_node(self): def test_aws_node(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404') req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
node = self.assertSuccess(req) node = self.assertSuccess(req)