Merge "Refactor NodeLauncher to be generic"
This commit is contained in:
commit
e881199916
@ -22,11 +22,16 @@ import importlib
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
|
||||
import six
|
||||
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
from nodepool import zk
|
||||
from nodepool import exceptions
|
||||
from nodepool import stats
|
||||
|
||||
|
||||
class Drivers:
|
||||
@ -622,6 +627,65 @@ class NodeRequestHandler(object):
|
||||
pass
|
||||
|
||||
|
||||
class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
'''
|
||||
Class to launch a single node.
|
||||
|
||||
The NodeRequestHandler may return such object to manage asynchronous
|
||||
node creation.
|
||||
|
||||
Subclasses are required to implement the launch method
|
||||
'''
|
||||
|
||||
def __init__(self, handler, node):
|
||||
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
|
||||
stats.StatsReporter.__init__(self)
|
||||
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
|
||||
self.handler = handler
|
||||
self.node = node
|
||||
self.label = handler.pool.labels[node.type]
|
||||
self.pool = self.label.pool
|
||||
self.provider_config = self.pool.provider
|
||||
|
||||
def storeNode(self):
|
||||
"""Store the node state in Zookeeper"""
|
||||
self.handler.zk.storeNode(self.node)
|
||||
|
||||
def run(self):
|
||||
start_time = time.monotonic()
|
||||
statsd_key = 'ready'
|
||||
|
||||
try:
|
||||
self.launch()
|
||||
except kze.SessionExpiredError:
|
||||
# Our node lock is gone, leaving the node state as BUILDING.
|
||||
# This will get cleaned up in ZooKeeper automatically, but we
|
||||
# must still set our cached node state to FAILED for the
|
||||
# NodeLaunchManager's poll() method.
|
||||
self.log.error(
|
||||
"Lost ZooKeeper session trying to launch for node %s",
|
||||
self.node.id)
|
||||
self.node.state = zk.FAILED
|
||||
statsd_key = 'error.zksession'
|
||||
except Exception as e:
|
||||
self.log.exception("Launch failed for node %s:",
|
||||
self.node.id)
|
||||
self.node.state = zk.FAILED
|
||||
self.handler.zk.storeNode(self.node)
|
||||
|
||||
if hasattr(e, 'statsd_key'):
|
||||
statsd_key = e.statsd_key
|
||||
else:
|
||||
statsd_key = 'error.unknown'
|
||||
|
||||
try:
|
||||
dt = int((time.monotonic() - start_time) * 1000)
|
||||
self.recordLaunchStats(statsd_key, dt)
|
||||
self.updateNodeStats(self.handler.zk, self.provider_config)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
||||
class ConfigValue(object):
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, ConfigValue):
|
||||
|
@ -13,86 +13,66 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import math
|
||||
import pprint
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils as utils
|
||||
from nodepool import stats
|
||||
from nodepool import zk
|
||||
from nodepool.driver import NodeLauncher
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
from nodepool.driver.openstack.provider import QuotaInformation
|
||||
|
||||
|
||||
class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
log = logging.getLogger("nodepool.driver.openstack."
|
||||
"NodeLauncher")
|
||||
|
||||
def __init__(self, zk, provider_label, provider_manager, requestor,
|
||||
node, retries):
|
||||
class OpenStackNodeLauncher(NodeLauncher):
|
||||
def __init__(self, handler, node, retries):
|
||||
'''
|
||||
Initialize the launcher.
|
||||
|
||||
:param ZooKeeper zk: A ZooKeeper object.
|
||||
:param ProviderLabel provider: A config ProviderLabel object.
|
||||
:param ProviderManager provider_manager: The manager object used to
|
||||
interact with the selected provider.
|
||||
:param str requestor: Identifier for the request originator.
|
||||
:param NodeRequestHandler handler: The handler object.
|
||||
:param Node node: The node object.
|
||||
:param int retries: Number of times to retry failed launches.
|
||||
'''
|
||||
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
|
||||
stats.StatsReporter.__init__(self)
|
||||
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
|
||||
self._zk = zk
|
||||
self._label = provider_label
|
||||
self._provider_manager = provider_manager
|
||||
self._node = node
|
||||
super().__init__(handler, node)
|
||||
self._retries = retries
|
||||
self._image_name = None
|
||||
self._requestor = requestor
|
||||
|
||||
self._pool = self._label.pool
|
||||
self._provider_config = self._pool.provider
|
||||
if self._label.diskimage:
|
||||
self._diskimage = self._provider_config.diskimages[
|
||||
self._label.diskimage.name]
|
||||
if self.label.diskimage:
|
||||
self._diskimage = self.provider_config.diskimages[
|
||||
self.label.diskimage.name]
|
||||
else:
|
||||
self._diskimage = None
|
||||
|
||||
def logConsole(self, server_id, hostname):
|
||||
if not self._label.console_log:
|
||||
def _logConsole(self, server_id, hostname):
|
||||
if not self.label.console_log:
|
||||
return
|
||||
console = self._provider_manager.getServerConsole(server_id)
|
||||
console = self.handler.manager.getServerConsole(server_id)
|
||||
if console:
|
||||
self.log.debug('Console log from hostname %s:' % hostname)
|
||||
for line in console.splitlines():
|
||||
self.log.debug(line.rstrip())
|
||||
|
||||
def _launchNode(self):
|
||||
if self._label.diskimage:
|
||||
if self.label.diskimage:
|
||||
# launch using diskimage
|
||||
cloud_image = self._zk.getMostRecentImageUpload(
|
||||
self._diskimage.name, self._provider_config.name)
|
||||
cloud_image = self.handler.zk.getMostRecentImageUpload(
|
||||
self._diskimage.name, self.provider_config.name)
|
||||
|
||||
if not cloud_image:
|
||||
raise exceptions.LaunchNodepoolException(
|
||||
"Unable to find current cloud image %s in %s" %
|
||||
(self._diskimage.name, self._provider_config.name)
|
||||
(self._diskimage.name, self.provider_config.name)
|
||||
)
|
||||
|
||||
config_drive = self._diskimage.config_drive
|
||||
image_external = dict(id=cloud_image.external_id)
|
||||
image_id = "{path}/{upload_id}".format(
|
||||
path=self._zk._imageUploadPath(cloud_image.image_name,
|
||||
cloud_image.build_id,
|
||||
cloud_image.provider_name),
|
||||
path=self.handler.zk._imageUploadPath(
|
||||
cloud_image.image_name,
|
||||
cloud_image.build_id,
|
||||
cloud_image.provider_name),
|
||||
upload_id=cloud_image.id)
|
||||
image_name = self._diskimage.name
|
||||
username = cloud_image.username
|
||||
@ -101,73 +81,73 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
|
||||
else:
|
||||
# launch using unmanaged cloud image
|
||||
config_drive = self._label.cloud_image.config_drive
|
||||
config_drive = self.label.cloud_image.config_drive
|
||||
|
||||
image_external = self._label.cloud_image.external
|
||||
image_id = self._label.cloud_image.name
|
||||
image_name = self._label.cloud_image.name
|
||||
username = self._label.cloud_image.username
|
||||
connection_type = self._label.cloud_image.connection_type
|
||||
connection_port = self._label.cloud_image.connection_port
|
||||
image_external = self.label.cloud_image.external
|
||||
image_id = self.label.cloud_image.name
|
||||
image_name = self.label.cloud_image.name
|
||||
username = self.label.cloud_image.username
|
||||
connection_type = self.label.cloud_image.connection_type
|
||||
connection_port = self.label.cloud_image.connection_port
|
||||
|
||||
hostname = self._provider_config.hostname_format.format(
|
||||
label=self._label, provider=self._provider_config, node=self._node
|
||||
hostname = self.provider_config.hostname_format.format(
|
||||
label=self.label, provider=self.provider_config, node=self.node
|
||||
)
|
||||
|
||||
self.log.info("Creating server with hostname %s in %s from image %s "
|
||||
"for node id: %s" % (hostname,
|
||||
self._provider_config.name,
|
||||
self.provider_config.name,
|
||||
image_name,
|
||||
self._node.id))
|
||||
self.node.id))
|
||||
|
||||
# NOTE: We store the node ID in the server metadata to use for leaked
|
||||
# instance detection. We cannot use the external server ID for this
|
||||
# because that isn't available in ZooKeeper until after the server is
|
||||
# active, which could cause a race in leak detection.
|
||||
|
||||
server = self._provider_manager.createServer(
|
||||
server = self.handler.manager.createServer(
|
||||
hostname,
|
||||
image=image_external,
|
||||
min_ram=self._label.min_ram,
|
||||
flavor_name=self._label.flavor_name,
|
||||
key_name=self._label.key_name,
|
||||
az=self._node.az,
|
||||
min_ram=self.label.min_ram,
|
||||
flavor_name=self.label.flavor_name,
|
||||
key_name=self.label.key_name,
|
||||
az=self.node.az,
|
||||
config_drive=config_drive,
|
||||
nodepool_node_id=self._node.id,
|
||||
nodepool_node_label=self._node.type,
|
||||
nodepool_node_id=self.node.id,
|
||||
nodepool_node_label=self.node.type,
|
||||
nodepool_image_name=image_name,
|
||||
networks=self._pool.networks,
|
||||
boot_from_volume=self._label.boot_from_volume,
|
||||
volume_size=self._label.volume_size)
|
||||
networks=self.pool.networks,
|
||||
boot_from_volume=self.label.boot_from_volume,
|
||||
volume_size=self.label.volume_size)
|
||||
|
||||
self._node.external_id = server.id
|
||||
self._node.hostname = hostname
|
||||
self._node.image_id = image_id
|
||||
self.node.external_id = server.id
|
||||
self.node.hostname = hostname
|
||||
self.node.image_id = image_id
|
||||
if username:
|
||||
self._node.username = username
|
||||
self._node.connection_type = connection_type
|
||||
self._node.connection_port = connection_port
|
||||
self.node.username = username
|
||||
self.node.connection_type = connection_type
|
||||
self.node.connection_port = connection_port
|
||||
|
||||
# Checkpoint save the updated node info
|
||||
self._zk.storeNode(self._node)
|
||||
self.storeNode()
|
||||
|
||||
self.log.debug("Waiting for server %s for node id: %s" %
|
||||
(server.id, self._node.id))
|
||||
server = self._provider_manager.waitForServer(
|
||||
server, self._provider_config.launch_timeout,
|
||||
auto_ip=self._pool.auto_floating_ip)
|
||||
(server.id, self.node.id))
|
||||
server = self.handler.manager.waitForServer(
|
||||
server, self.provider_config.launch_timeout,
|
||||
auto_ip=self.pool.auto_floating_ip)
|
||||
|
||||
if server.status != 'ACTIVE':
|
||||
raise exceptions.LaunchStatusException("Server %s for node id: %s "
|
||||
"status: %s" %
|
||||
(server.id, self._node.id,
|
||||
(server.id, self.node.id,
|
||||
server.status))
|
||||
|
||||
# If we didn't specify an AZ, set it to the one chosen by Nova.
|
||||
# Do this after we are done waiting since AZ may not be available
|
||||
# immediately after the create request.
|
||||
if not self._node.az:
|
||||
self._node.az = server.location.zone
|
||||
if not self.node.az:
|
||||
self.node.az = server.location.zone
|
||||
|
||||
interface_ip = server.interface_ip
|
||||
if not interface_ip:
|
||||
@ -177,37 +157,37 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
raise exceptions.LaunchNetworkException(
|
||||
"Unable to find public IP of server")
|
||||
|
||||
self._node.interface_ip = interface_ip
|
||||
self._node.public_ipv4 = server.public_v4
|
||||
self._node.public_ipv6 = server.public_v6
|
||||
self._node.private_ipv4 = server.private_v4
|
||||
self.node.interface_ip = interface_ip
|
||||
self.node.public_ipv4 = server.public_v4
|
||||
self.node.public_ipv6 = server.public_v6
|
||||
self.node.private_ipv4 = server.private_v4
|
||||
# devstack-gate multi-node depends on private_v4 being populated
|
||||
# with something. On clouds that don't have a private address, use
|
||||
# the public.
|
||||
if not self._node.private_ipv4:
|
||||
self._node.private_ipv4 = server.public_v4
|
||||
if not self.node.private_ipv4:
|
||||
self.node.private_ipv4 = server.public_v4
|
||||
|
||||
# Checkpoint save the updated node info
|
||||
self._zk.storeNode(self._node)
|
||||
self.storeNode()
|
||||
|
||||
self.log.debug(
|
||||
"Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, "
|
||||
"ipv6: %s]" %
|
||||
(self._node.id, self._node.region, self._node.az,
|
||||
self._node.interface_ip, self._node.public_ipv4,
|
||||
self._node.public_ipv6))
|
||||
(self.node.id, self.node.region, self.node.az,
|
||||
self.node.interface_ip, self.node.public_ipv4,
|
||||
self.node.public_ipv6))
|
||||
|
||||
# wait and scan the new node and record in ZooKeeper
|
||||
host_keys = []
|
||||
if self._pool.host_key_checking:
|
||||
if self.pool.host_key_checking:
|
||||
try:
|
||||
self.log.debug(
|
||||
"Gathering host keys for node %s", self._node.id)
|
||||
"Gathering host keys for node %s", self.node.id)
|
||||
# only gather host keys if the connection type is ssh
|
||||
gather_host_keys = connection_type == 'ssh'
|
||||
host_keys = utils.nodescan(
|
||||
interface_ip,
|
||||
timeout=self._provider_config.boot_timeout,
|
||||
timeout=self.provider_config.boot_timeout,
|
||||
gather_hostkeys=gather_host_keys,
|
||||
port=connection_port)
|
||||
|
||||
@ -215,13 +195,13 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
raise exceptions.LaunchKeyscanException(
|
||||
"Unable to gather host keys")
|
||||
except exceptions.ConnectionTimeoutException:
|
||||
self.logConsole(self._node.external_id, self._node.hostname)
|
||||
self.logConsole(self.node.external_id, self.node.hostname)
|
||||
raise
|
||||
|
||||
self._node.host_keys = host_keys
|
||||
self._zk.storeNode(self._node)
|
||||
self.node.host_keys = host_keys
|
||||
self.storeNode()
|
||||
|
||||
def _run(self):
|
||||
def launch(self):
|
||||
attempts = 1
|
||||
while attempts <= self._retries:
|
||||
try:
|
||||
@ -235,65 +215,28 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
|
||||
if attempts <= self._retries:
|
||||
self.log.exception(
|
||||
"Launch attempt %d/%d failed for node %s:",
|
||||
attempts, self._retries, self._node.id)
|
||||
attempts, self._retries, self.node.id)
|
||||
# If we created an instance, delete it.
|
||||
if self._node.external_id:
|
||||
self._provider_manager.cleanupNode(self._node.external_id)
|
||||
self._provider_manager.waitForNodeCleanup(
|
||||
self._node.external_id
|
||||
)
|
||||
self._node.external_id = None
|
||||
self._node.public_ipv4 = None
|
||||
self._node.public_ipv6 = None
|
||||
self._node.interface_ip = None
|
||||
self._zk.storeNode(self._node)
|
||||
if self.node.external_id:
|
||||
self.handler.manager.cleanupNode(self.node.external_id)
|
||||
self.handler.manager.waitForNodeCleanup(
|
||||
self.node.external_id)
|
||||
self.node.external_id = None
|
||||
self.node.public_ipv4 = None
|
||||
self.node.public_ipv6 = None
|
||||
self.node.interface_ip = None
|
||||
self.storeNode()
|
||||
if attempts == self._retries:
|
||||
raise
|
||||
# Invalidate the quota cache if we encountered a quota error.
|
||||
if 'quota exceeded' in str(e).lower():
|
||||
self.log.info("Quota exceeded, invalidating quota cache")
|
||||
self._provider_manager.invalidateQuotaCache()
|
||||
self.handler.manager.invalidateQuotaCache()
|
||||
attempts += 1
|
||||
|
||||
self._node.state = zk.READY
|
||||
self._zk.storeNode(self._node)
|
||||
self.log.info("Node id %s is ready", self._node.id)
|
||||
|
||||
def run(self):
|
||||
start_time = time.time()
|
||||
statsd_key = 'ready'
|
||||
|
||||
try:
|
||||
self._run()
|
||||
except kze.SessionExpiredError:
|
||||
# Our node lock is gone, leaving the node state as BUILDING.
|
||||
# This will get cleaned up in ZooKeeper automatically, but we
|
||||
# must still set our cached node state to FAILED for the
|
||||
# NodeLaunchManager's poll() method.
|
||||
self.log.error(
|
||||
"Lost ZooKeeper session trying to launch for node %s",
|
||||
self._node.id)
|
||||
self._node.state = zk.FAILED
|
||||
statsd_key = 'error.zksession'
|
||||
except Exception as e:
|
||||
self.log.exception("Launch failed for node %s:",
|
||||
self._node.id)
|
||||
self._node.state = zk.FAILED
|
||||
self._zk.storeNode(self._node)
|
||||
|
||||
if hasattr(e, 'statsd_key'):
|
||||
statsd_key = e.statsd_key
|
||||
else:
|
||||
statsd_key = 'error.unknown'
|
||||
|
||||
try:
|
||||
dt = int((time.time() - start_time) * 1000)
|
||||
self.recordLaunchStats(statsd_key, dt, self._image_name,
|
||||
self._node.provider, self._node.az,
|
||||
self._requestor)
|
||||
self.updateNodeStats(self._zk, self._provider_config)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
self.node.state = zk.READY
|
||||
self.storeNode()
|
||||
self.log.info("Node id %s is ready", self.node.id)
|
||||
|
||||
|
||||
class OpenStackNodeRequestHandler(NodeRequestHandler):
|
||||
@ -301,9 +244,6 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
||||
def __init__(self, pw, request):
|
||||
super().__init__(pw, request)
|
||||
self.chosen_az = None
|
||||
self.log = logging.getLogger(
|
||||
"nodepool.driver.openstack.OpenStackNodeRequestHandler[%s]" %
|
||||
self.launcher_id)
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool)
|
||||
@ -388,7 +328,4 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
||||
node.region = self.provider.region_name
|
||||
|
||||
def launch(self, node):
|
||||
return NodeLauncher(
|
||||
self.zk, self.pool.labels[node.type], self.manager,
|
||||
self.request.requestor, node,
|
||||
self.provider.launch_retries)
|
||||
return OpenStackNodeLauncher(self, node, self.provider.launch_retries)
|
||||
|
@ -51,35 +51,30 @@ class StatsReporter(object):
|
||||
super(StatsReporter, self).__init__()
|
||||
self._statsd = get_client()
|
||||
|
||||
def recordLaunchStats(self, subkey, dt, image_name,
|
||||
provider_name, node_az, requestor):
|
||||
def recordLaunchStats(self, subkey, dt):
|
||||
'''
|
||||
Record node launch statistics.
|
||||
|
||||
:param str subkey: statsd key
|
||||
:param int dt: Time delta in milliseconds
|
||||
:param str image_name: Name of the image used
|
||||
:param str provider_name: Name of the provider
|
||||
:param str node_az: AZ of the launched node
|
||||
:param str requestor: Identifier for the request originator
|
||||
'''
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
keys = [
|
||||
'nodepool.launch.provider.%s.%s' % (provider_name, subkey),
|
||||
'nodepool.launch.image.%s.%s' % (image_name, subkey),
|
||||
'nodepool.launch.provider.%s.%s' % (
|
||||
self.provider_config.name, subkey),
|
||||
'nodepool.launch.%s' % (subkey,),
|
||||
]
|
||||
|
||||
if node_az:
|
||||
if self.node.az:
|
||||
keys.append('nodepool.launch.provider.%s.%s.%s' %
|
||||
(provider_name, node_az, subkey))
|
||||
(self.provider_config.name, self.node.az, subkey))
|
||||
|
||||
if requestor:
|
||||
if self.handler.request.requestor:
|
||||
# Replace '.' which is a graphite hierarchy, and ':' which is
|
||||
# a statsd delimeter.
|
||||
requestor = requestor.replace('.', '_')
|
||||
requestor = self.handler.request.requestor.replace('.', '_')
|
||||
requestor = requestor.replace(':', '_')
|
||||
keys.append('nodepool.launch.requestor.%s.%s' %
|
||||
(requestor, subkey))
|
||||
|
@ -1292,7 +1292,8 @@ class TestLauncher(tests.DBTestCase):
|
||||
time.sleep(1)
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
|
||||
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode')
|
||||
@mock.patch('nodepool.driver.openstack.handler.'
|
||||
'OpenStackNodeLauncher._launchNode')
|
||||
def test_launchNode_session_expired(self, mock_launch):
|
||||
'''
|
||||
Test ZK session lost during _launchNode().
|
||||
|
@ -89,7 +89,8 @@ class TestNodeLaunchManager(tests.DBTestCase):
|
||||
self.assertEqual(nodes[0]['metadata']['groups'],
|
||||
'fake-provider,fake-image,fake-label')
|
||||
|
||||
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode')
|
||||
@mock.patch('nodepool.driver.openstack.handler.'
|
||||
'OpenStackNodeLauncher._launchNode')
|
||||
def test_failed_launch(self, mock_launch):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
self._setup(configfile)
|
||||
@ -105,7 +106,8 @@ class TestNodeLaunchManager(tests.DBTestCase):
|
||||
self.assertEqual(len(handler.failed_nodes), 1)
|
||||
self.assertEqual(len(handler.ready_nodes), 0)
|
||||
|
||||
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode')
|
||||
@mock.patch('nodepool.driver.openstack.handler.'
|
||||
'OpenStackNodeLauncher._launchNode')
|
||||
def test_mixed_launch(self, mock_launch):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
self._setup(configfile)
|
||||
|
Loading…
x
Reference in New Issue
Block a user