Merge "AWS driver create/delete improvements"

This commit is contained in:
Zuul 2022-06-30 23:24:02 +00:00 committed by Gerrit Code Review
commit a6768c9970
8 changed files with 205 additions and 33 deletions

View File

@ -87,6 +87,12 @@ Selecting the ``aws`` driver adds the following options to the
See `Boto Configuration`_ for more information. See `Boto Configuration`_ for more information.
.. attr:: rate
:type: float
:default: 2.0
The number of operations per second to perform against the provider.
.. attr:: boot-timeout .. attr:: boot-timeout
:type: int seconds :type: int seconds
:default: 180 :default: 180

View File

@ -167,10 +167,10 @@ section of the configuration.
platforms. The default value is true. platforms. The default value is true.
.. attr:: rate .. attr:: rate
:type: float seconds :type: float
:default: 1.0 :default: 1.0
In seconds, amount to wait between operations on the provider. The number of operations per second to perform against the provider.
.. attr:: boot-timeout .. attr:: boot-timeout
:type: int seconds :type: int seconds

View File

@ -13,19 +13,22 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from concurrent.futures import ThreadPoolExecutor
import cachetools.func
import json import json
import logging import logging
import math import math
import cachetools.func
import urllib.parse
import time
import re import re
import threading
import boto3 import queue
import time
import urllib.parse
from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver import statemachine from nodepool.driver import statemachine
import boto3
def tag_dict_to_list(tagdict): def tag_dict_to_list(tagdict):
# TODO: validate tag values are strings in config and deprecate # TODO: validate tag values are strings in config and deprecate
@ -71,9 +74,6 @@ class AwsResource(statemachine.Resource):
class AwsDeleteStateMachine(statemachine.StateMachine): class AwsDeleteStateMachine(statemachine.StateMachine):
VM_DELETING = 'deleting vm' VM_DELETING = 'deleting vm'
NIC_DELETING = 'deleting nic'
PIP_DELETING = 'deleting pip'
DISK_DELETING = 'deleting disk'
COMPLETE = 'complete' COMPLETE = 'complete'
def __init__(self, adapter, external_id, log): def __init__(self, adapter, external_id, log):
@ -98,6 +98,7 @@ class AwsDeleteStateMachine(statemachine.StateMachine):
class AwsCreateStateMachine(statemachine.StateMachine): class AwsCreateStateMachine(statemachine.StateMachine):
INSTANCE_CREATING_SUBMIT = 'submit creating instance'
INSTANCE_CREATING = 'creating instance' INSTANCE_CREATING = 'creating instance'
INSTANCE_RETRY = 'retrying instance creation' INSTANCE_RETRY = 'retrying instance creation'
COMPLETE = 'complete' COMPLETE = 'complete'
@ -124,10 +125,16 @@ class AwsCreateStateMachine(statemachine.StateMachine):
def advance(self): def advance(self):
if self.state == self.START: if self.state == self.START:
self.external_id = self.hostname self.external_id = self.hostname
self.create_future = self.adapter._submitCreateInstance(
self.instance = self.adapter._createInstance(
self.label, self.image_external_id, self.label, self.image_external_id,
self.tags, self.hostname, self.log) self.tags, self.hostname, self.log)
self.state = self.INSTANCE_CREATING_SUBMIT
if self.state == self.INSTANCE_CREATING_SUBMIT:
instance = self.adapter._completeCreateInstance(self.create_future)
if instance is None:
return
self.instance = instance
self.quota = self.adapter._getQuotaForInstanceType( self.quota = self.adapter._getQuotaForInstanceType(
self.instance.instance_type) self.instance.instance_type)
self.state = self.INSTANCE_CREATING self.state = self.INSTANCE_CREATING
@ -142,7 +149,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
raise Exception("Too many retries") raise Exception("Too many retries")
self.attempts += 1 self.attempts += 1
self.instance = self.adapter._deleteInstance( self.instance = self.adapter._deleteInstance(
self.external_id, self.log) self.external_id, self.log, immediate=True)
self.state = self.INSTANCE_RETRY self.state = self.INSTANCE_RETRY
else: else:
return return
@ -165,7 +172,39 @@ class AwsAdapter(statemachine.Adapter):
self.log = logging.getLogger( self.log = logging.getLogger(
f"nodepool.AwsAdapter.{provider_config.name}") f"nodepool.AwsAdapter.{provider_config.name}")
self.provider = provider_config self.provider = provider_config
# The standard rate limit, this might be 1 request per second self._running = True
# AWS has a default rate limit for creating instances that
# works out to a sustained 2 instances/sec, but the actual
# create instance API call takes 1 second or more. If we want
# to achieve faster than 1 instance/second throughput, we need
# to parallelize create instance calls, so we set up a
# threadworker to do that.
# A little bit of a heuristic here to set the worker count.
# It appears that AWS typically takes 1-1.5 seconds to execute
# a create API call. Figure out how many we have to do in
# parallel in order to run at the rate limit, then quadruple
# that for headroom. Max out at 8 so we don't end up with too
# many threads. In practice, this will be 8 with the default
# values, and only less if users slow down the rate.
workers = max(min(int(self.provider.rate * 4), 8), 1)
self.log.info("Create executor with max workers=%s", workers)
self.create_executor = ThreadPoolExecutor(max_workers=workers)
# We can batch delete instances using the AWS API, so to do
# that, create a queue for deletes, and a thread to process
# the queue. It will be greedy and collect as many pending
# instance deletes as possible to delete together. Typically
# under load, that will mean a single instance delete followed
# by larger batches. That strikes a balance between
# responsiveness and efficiency. Reducing the overall number
# of requests leaves more time for create instance calls.
self.delete_queue = queue.Queue()
self.delete_thread = threading.Thread(target=self._deleteThread)
self.delete_thread.daemon = True
self.delete_thread.start()
self.rate_limiter = RateLimiter(self.provider.name, self.rate_limiter = RateLimiter(self.provider.name,
self.provider.rate) self.provider.rate)
# Non mutating requests can be made more often at 10x the rate # Non mutating requests can be made more often at 10x the rate
@ -190,6 +229,10 @@ class AwsAdapter(statemachine.Adapter):
self.not_our_images = set() self.not_our_images = set()
self.not_our_snapshots = set() self.not_our_snapshots = set()
def stop(self):
self.create_executor.shutdown()
self._running = False
def getCreateStateMachine(self, hostname, label, def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries, log): image_external_id, metadata, retries, log):
return AwsCreateStateMachine(self, hostname, label, return AwsCreateStateMachine(self, hostname, label,
@ -232,7 +275,7 @@ class AwsAdapter(statemachine.Adapter):
def deleteResource(self, resource): def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}") self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'instance': if resource.type == 'instance':
self._deleteInstance(resource.id) self._deleteInstance(resource.id, immediate=True)
if resource.type == 'volume': if resource.type == 'volume':
self._deleteVolume(resource.id) self._deleteVolume(resource.id)
if resource.type == 'ami': if resource.type == 'ami':
@ -519,6 +562,18 @@ class AwsAdapter(statemachine.Adapter):
with self.non_mutating_rate_limiter: with self.non_mutating_rate_limiter:
return self.ec2.Image(image_id) return self.ec2.Image(image_id)
def _submitCreateInstance(self, label, image_external_id,
tags, hostname, log):
return self.create_executor.submit(
self._createInstance,
label, image_external_id,
tags, hostname, log)
def _completeCreateInstance(self, future):
if not future.done():
return None
return future.result()
def _createInstance(self, label, image_external_id, def _createInstance(self, label, image_external_id,
tags, hostname, log): tags, hostname, log):
if image_external_id: if image_external_id:
@ -600,7 +655,38 @@ class AwsAdapter(statemachine.Adapter):
log.debug(f"Created VM {hostname} as instance {instances[0].id}") log.debug(f"Created VM {hostname} as instance {instances[0].id}")
return instances[0] return instances[0]
def _deleteInstance(self, external_id, log=None): def _deleteThread(self):
while self._running:
try:
self._deleteThreadInner()
except Exception:
self.log.exception("Error in delete thread:")
time.sleep(5)
def _deleteThreadInner(self):
records = []
try:
records.append(self.delete_queue.get(block=True, timeout=10))
except queue.Empty:
return
while True:
try:
records.append(self.delete_queue.get(block=False))
except queue.Empty:
break
# The terminate call has a limit of 1k, but AWS recommends
# smaller batches. We limit to 50 here.
if len(records) >= 50:
break
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
def _deleteInstance(self, external_id, log=None, immediate=False):
if log is None: if log is None:
log = self.log log = self.log
for instance in self._listInstances(): for instance in self._listInstances():
@ -609,9 +695,12 @@ class AwsAdapter(statemachine.Adapter):
else: else:
log.warning(f"Instance not found when deleting {external_id}") log.warning(f"Instance not found when deleting {external_id}")
return None return None
if immediate:
with self.rate_limiter(log.debug, "Deleted instance"): with self.rate_limiter(log.debug, "Deleted instance"):
log.debug(f"Deleting instance {external_id}") log.debug(f"Deleting instance {external_id}")
instance.terminate() instance.terminate()
else:
self.delete_queue.put((external_id, log))
return instance return instance
def _deleteVolume(self, external_id): def _deleteVolume(self, external_id):

View File

@ -251,7 +251,7 @@ class AwsProviderConfig(ProviderConfig):
self.profile_name = self.provider.get('profile-name') self.profile_name = self.provider.get('profile-name')
self.region_name = self.provider.get('region-name') self.region_name = self.provider.get('region-name')
self.rate = self.provider.get('rate', 1) self.rate = self.provider.get('rate', 2)
self.launch_retries = self.provider.get('launch-retries', 3) self.launch_retries = self.provider.get('launch-retries', 3)
self.launch_timeout = self.provider.get('launch-timeout', 3600) self.launch_timeout = self.provider.get('launch-timeout', 3600)
self.boot_timeout = self.provider.get('boot-timeout', 180) self.boot_timeout = self.provider.get('boot-timeout', 180)

View File

@ -493,6 +493,7 @@ class StateMachineProvider(Provider, QuotaSupport):
self.running = False self.running = False
if self.keyscan_worker: if self.keyscan_worker:
self.keyscan_worker.shutdown() self.keyscan_worker.shutdown()
self.adapter.stop()
self.log.debug("Stopped") self.log.debug("Stopped")
def join(self): def join(self):
@ -821,6 +822,10 @@ class Adapter:
def __init__(self, provider_config): def __init__(self, provider_config):
pass pass
def stop(self):
"""Release any resources as this provider is being stopped"""
pass
def getCreateStateMachine(self, hostname, label, def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries, image_external_id, metadata, retries,
log): log):

View File

@ -430,8 +430,12 @@ class RateLimiter:
def __init__(self, name, rate_limit): def __init__(self, name, rate_limit):
self._running = True self._running = True
self.name = name self.name = name
if not rate_limit:
self.delta = 0.0
else:
self.delta = 1.0 / rate_limit self.delta = 1.0 / rate_limit
self.last_ts = None self.last_ts = None
self.lock = threading.Lock()
def __call__(self, logmethod, msg): def __call__(self, logmethod, msg):
return RateLimitInstance(self, logmethod, msg) return RateLimitInstance(self, logmethod, msg)
@ -440,8 +444,10 @@ class RateLimiter:
self._enter() self._enter()
def _enter(self): def _enter(self):
with self.lock:
total_delay = 0.0 total_delay = 0.0
if self.last_ts is None: if self.last_ts is None:
self.last_ts = time.monotonic()
return total_delay return total_delay
while True: while True:
now = time.monotonic() now = time.monotonic()

View File

@ -0,0 +1,38 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: ubuntu1404
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404
cloud-image: ubuntu1404
instance-type: t3.medium
key-name: zuul

View File

@ -176,6 +176,34 @@ class TestDriverAws(tests.DBTestCase):
{'key1': 'value1', 'key2': 'value2'}) {'key1': 'value1', 'key2': 'value2'})
return node return node
def test_aws_multiple(self):
# Test creating multiple instances at once. This is most
# useful to run manually during development to observe
# behavior.
configfile = self.setup_config('aws/aws-multiple.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.patchProvider(pool)
reqs = []
for x in range(4):
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req)
reqs.append(req)
nodes = []
for req in reqs:
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
nodes.append(self.assertSuccess(req))
for node in nodes:
node.state = zk.USED
self.zk.storeNode(node)
for node in nodes:
self.waitForNodeDeletion(node)
def test_aws_node(self): def test_aws_node(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404') req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
node = self.assertSuccess(req) node = self.assertSuccess(req)