From be3edd3e17eb1f262c77a5adf6ca3f9c5214886e Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 24 Oct 2022 12:43:26 -0700 Subject: [PATCH] Convert openstack driver to statemachine This updates the OpenStack driver to use the statemachine framework. The goal is to revise all remaining drivers to use the statemachine framework for two reasons: 1) We can dramatically reduce the number of threads in Nodepool which is our biggest scaling bottleneck. The OpenStack driver already includes some work in that direction, but in a way that is unique to it and not easily shared by other drivers. The statemachine framework is an extension of that idea implemented so that every driver can use it. This change further reduces the number of threads needed even for the openstack driver. 2) By unifying all the drivers with a simple interface, we can prepare to move them into Zuul. There are a few updates to the statemachine framework to accomodate some features that only the OpenStack driver used to date. A number of tests need slight alteration since the openstack driver is the basis of the "fake" driver used for tests. Change-Id: Ie59a4e9f09990622b192ad840d9c948db717cce2 --- nodepool/cmd/nodepoolcmd.py | 2 +- nodepool/driver/__init__.py | 1 + nodepool/driver/aws/adapter.py | 2 +- nodepool/driver/aws/config.py | 1 + nodepool/driver/azure/adapter.py | 2 +- nodepool/driver/azure/config.py | 1 + nodepool/driver/example/config.py | 1 + nodepool/driver/fake/__init__.py | 38 +- .../driver/fake/{provider.py => adapter.py} | 136 +-- nodepool/driver/fake/handler.py | 19 - nodepool/driver/gce/adapter.py | 2 +- nodepool/driver/gce/config.py | 1 + nodepool/driver/ibmvpc/adapter.py | 2 +- nodepool/driver/ibmvpc/config.py | 1 + nodepool/driver/metastatic/adapter.py | 2 +- nodepool/driver/metastatic/config.py | 1 + nodepool/driver/openstack/__init__.py | 38 +- nodepool/driver/openstack/adapter.py | 801 ++++++++++++++++++ nodepool/driver/openstack/config.py | 38 +- nodepool/driver/openstack/handler.py | 477 ----------- nodepool/driver/openstack/provider.py | 659 -------------- nodepool/driver/statemachine.py | 116 ++- nodepool/tests/__init__.py | 10 +- .../tests/fixtures/node_auto_floating_ip.yaml | 2 +- nodepool/tests/unit/test_builder.py | 10 +- nodepool/tests/unit/test_commands.py | 2 +- nodepool/tests/unit/test_driver_metastatic.py | 10 +- nodepool/tests/unit/test_driver_static.py | 3 +- nodepool/tests/unit/test_launcher.py | 114 +-- nodepool/tests/unit/test_sdk_integration.py | 10 +- 30 files changed, 1115 insertions(+), 1387 deletions(-) rename nodepool/driver/fake/{provider.py => adapter.py} (81%) delete mode 100644 nodepool/driver/fake/handler.py create mode 100644 nodepool/driver/openstack/adapter.py delete mode 100644 nodepool/driver/openstack/handler.py delete mode 100755 nodepool/driver/openstack/provider.py diff --git a/nodepool/cmd/nodepoolcmd.py b/nodepool/cmd/nodepoolcmd.py index 22b62eaab..d3b9af820 100644 --- a/nodepool/cmd/nodepoolcmd.py +++ b/nodepool/cmd/nodepoolcmd.py @@ -256,7 +256,7 @@ class NodePoolCmd(NodepoolApp): # up in alien list since we can't do anything about them # anyway. provider_images = [ - image for image in manager.listImages() + image for image in manager.adapter._listImages() if 'nodepool_build_id' in image['properties']] except Exception as e: log.warning("Exception listing alien images for %s: %s" diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 3e85b9148..dfacc59a6 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -894,6 +894,7 @@ class ConfigPool(ConfigValue, metaclass=abc.ABCMeta): self.node_attributes = None self.priority = None self.ignore_provider_quota = False + self.azs = None @classmethod def getCommonSchemaDict(self): diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 54a0621c9..318961109 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -289,7 +289,7 @@ class AwsAdapter(statemachine.Adapter): self._running = False def getCreateStateMachine(self, hostname, label, image_external_id, - metadata, retries, request, log): + metadata, retries, request, az, log): return AwsCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index 2c27a4842..2d281ead7 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -176,6 +176,7 @@ class AwsLabel(ConfigValue): self.iam_instance_profile = label.get('iam-instance-profile', None) self.tags = label.get('tags', {}) self.dynamic_tags = label.get('dynamic-tags', {}) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/azure/adapter.py b/nodepool/driver/azure/adapter.py index 474fa52dc..757d8b76c 100644 --- a/nodepool/driver/azure/adapter.py +++ b/nodepool/driver/azure/adapter.py @@ -347,7 +347,7 @@ class AzureAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return AzureCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/azure/config.py b/nodepool/driver/azure/config.py index bd439443f..87487d8eb 100644 --- a/nodepool/driver/azure/config.py +++ b/nodepool/driver/azure/config.py @@ -172,6 +172,7 @@ class AzureLabel(ConfigValue): self.dynamic_tags = label.get('dynamic-tags', {}) self.user_data = self._encodeData(label.get('user-data', None)) self.custom_data = self._encodeData(label.get('custom-data', None)) + self.host_key_checking = self.pool.host_key_checking def _encodeData(self, s): if not s: diff --git a/nodepool/driver/example/config.py b/nodepool/driver/example/config.py index bb5bd9995..8690431cc 100644 --- a/nodepool/driver/example/config.py +++ b/nodepool/driver/example/config.py @@ -54,6 +54,7 @@ class ProviderLabel(ConfigValue): self.instance_type = None # The ProviderPool object that owns this label. self.pool = None + self.host_key_checking = None def __eq__(self, other): if isinstance(other, ProviderLabel): diff --git a/nodepool/driver/fake/__init__.py b/nodepool/driver/fake/__init__.py index 03b5ffd65..f77f1110b 100644 --- a/nodepool/driver/fake/__init__.py +++ b/nodepool/driver/fake/__init__.py @@ -1,34 +1,34 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2022 Acme Gating, LLC # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from openstack.config import loader -from nodepool.driver import Driver +from nodepool.driver.statemachine import StateMachineDriver from nodepool.driver.fake.config import FakeProviderConfig -from nodepool.driver.fake.provider import FakeProvider +from nodepool.driver.fake.adapter import FakeAdapter -class FakeDriver(Driver): - def __init__(self): - super().__init__() - self.reset() - +class FakeDriver(StateMachineDriver): def reset(self): self.openstack_config = loader.OpenStackConfig() + def __init__(self): + self.reset() + super().__init__() + def getProviderConfig(self, provider): return FakeProviderConfig(self, provider) - def getProvider(self, provider_config): - return FakeProvider(provider_config) + def getAdapter(self, provider_config): + return FakeAdapter(provider_config) diff --git a/nodepool/driver/fake/provider.py b/nodepool/driver/fake/adapter.py similarity index 81% rename from nodepool/driver/fake/provider.py rename to nodepool/driver/fake/adapter.py index 48dcb98f3..7702debeb 100644 --- a/nodepool/driver/fake/provider.py +++ b/nodepool/driver/fake/adapter.py @@ -1,18 +1,17 @@ # Copyright (C) 2011-2013 OpenStack Foundation +# Copyright 2022 Acme Gating, LLC # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. import logging import threading @@ -20,12 +19,11 @@ import time import uuid import openstack.exceptions - -from nodepool import exceptions -from nodepool.driver.openstack.provider import OpenStackProvider -from nodepool.driver.fake.handler import FakeNodeRequestHandler from openstack.cloud.exc import OpenStackCloudCreateException +from nodepool.driver.openstack.adapter import OpenStackAdapter +from nodepool import exceptions + class Dummy(object): IMAGE = 'Image' @@ -56,6 +54,9 @@ class Dummy(object): args = ' '.join(args) return '<%s %s %s>' % (self.__kind, id(self), args) + def __contains__(self, key): + return key in self.__dict__ + def __getitem__(self, key, default=None): return getattr(self, key, default) @@ -90,10 +91,13 @@ class FakeOpenStackCloud(object): ] if networks is None: self.ipv6_network_uuid = uuid.uuid4().hex + self.no_auto_ip_network_uuid = uuid.uuid4().hex networks = [dict(id=uuid.uuid4().hex, name='fake-public-network-name'), dict(id=uuid.uuid4().hex, name='fake-private-network-name'), + dict(id=self.no_auto_ip_network_uuid, + name='no-auto-ip-network-name'), dict(id=self.ipv6_network_uuid, name='fake-ipv6-network-name')] self.networks = networks @@ -112,6 +116,7 @@ class FakeOpenStackCloud(object): Dummy(Dummy.PORT, id=uuid.uuid4().hex, status='DOWN', device_owner=None), ] + self._floating_ip_list = [] def _update_quota(self): self.max_cores, self.max_instances, self.max_ram = FakeOpenStackCloud.\ @@ -141,7 +146,10 @@ class FakeOpenStackCloud(object): addresses = None # if keyword 'ipv6-uuid' is found in provider config, # ipv6 address will be available in public addr dict. + auto_ip = True for nic in nics: + if nic['net-id'] == self.no_auto_ip_network_uuid: + auto_ip = False if nic['net-id'] != self.ipv6_network_uuid: continue addresses = dict( @@ -156,15 +164,20 @@ class FakeOpenStackCloud(object): interface_ip = 'fake_v6' break if not addresses: - addresses = dict( - public=[dict(version=4, addr='fake')], - private=[dict(version=4, addr='fake')] - ) - public_v6 = '' - public_v4 = 'fake' - private_v4 = 'fake' host_id = 'fake' - interface_ip = 'fake' + private_v4 = 'fake' + if auto_ip: + addresses = dict( + public=[dict(version=4, addr='fake')], + private=[dict(version=4, addr='fake')] + ) + public_v6 = '' + public_v4 = 'fake' + interface_ip = 'fake' + else: + public_v4 = '' + public_v6 = '' + interface_ip = private_v4 self._update_quota() over_quota = False if (instance_type == Dummy.INSTANCE and @@ -190,12 +203,13 @@ class FakeOpenStackCloud(object): host_id=host_id, interface_ip=interface_ip, security_groups=security_groups, - location=Dummy(Dummy.LOCATION, zone=kw.get('az')), + location=Dummy(Dummy.LOCATION, zone=az), metadata=kw.get('meta', {}), manager=self, key_name=kw.get('key_name', None), should_fail=should_fail, over_quota=over_quota, + flavor=kw.get('flavor'), event=threading.Event(), _kw=kw) instance_list.append(s) @@ -273,19 +287,22 @@ class FakeOpenStackCloud(object): def get_server_by_id(self, server_id): return self.get_server(server_id) - def _clean_floating_ip(self, server): - server.public_v4 = '' - server.public_v6 = '' - server.interface_ip = server.private_v4 - return server + def list_floating_ips(self): + return self._floating_ip_list - def wait_for_server(self, server, **kwargs): - while server.status == 'BUILD': - time.sleep(0.1) - auto_ip = kwargs.get('auto_ip') - if not auto_ip: - server = self._clean_floating_ip(server) - return server + def create_floating_ip(self, server): + fip = Dummy('floating_ips', + id=uuid.uuid4().hex, + floating_ip_address='fake', + status='ACTIVE') + self._floating_ip_list.append(fip) + return fip + + def _needs_floating_ip(self, server, nat_destination): + return False + + def _has_floating_ips(self): + return False def list_servers(self, bare=False): return self._server_list @@ -326,8 +343,8 @@ class FakeOpenStackCloud(object): class FakeUploadFailCloud(FakeOpenStackCloud): log = logging.getLogger("nodepool.FakeUploadFailCloud") - def __init__(self, times_to_fail=None): - super(FakeUploadFailCloud, self).__init__() + def __init__(self, *args, times_to_fail=None, **kw): + super(FakeUploadFailCloud, self).__init__(*args, **kw) self.times_to_fail = times_to_fail self.times_failed = 0 @@ -344,14 +361,16 @@ class FakeUploadFailCloud(FakeOpenStackCloud): class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud): log = logging.getLogger("nodepool.FakeLaunchAndGetFaultCloud") - def __init__(self): - super().__init__() - - def wait_for_server(self, server, **kwargs): + def create_server(self, *args, **kwargs): # OpenStack provider launch code specifically looks for 'quota' in # the failure message. + server = super().create_server( + *args, **kwargs, + done_status='ERROR') + # Don't wait for the async update + server.status = 'ERROR' server.fault = {'message': 'quota server fault'} - raise Exception("wait_for_server failure") + raise OpenStackCloudCreateException('server', server.id) class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): @@ -366,16 +385,18 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): self.launch_success = False self.delete_success = False - def wait_for_server(self, **kwargs): + def create_server(self, *args, **kwargs): if self.times_to_fail_launch is None: raise Exception("Test fail server launch.") if self.times_failed_launch < self.times_to_fail_launch: self.times_failed_launch += 1 - raise exceptions.ServerDeleteException("Test fail server launch.") + # Simulate a failure after the server record is created + ret = super().create_server(*args, **kwargs, done_status='ERROR') + ret.fault = {'message': 'expected error'} + return ret else: self.launch_success = True - return super(FakeLaunchAndDeleteFailCloud, - self).wait_for_server(**kwargs) + return super().create_server(*args, **kwargs) def delete_server(self, *args, **kwargs): if self.times_to_fail_delete is None: @@ -385,8 +406,7 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): raise exceptions.ServerDeleteException("Test fail server delete.") else: self.delete_success = True - return super(FakeLaunchAndDeleteFailCloud, - self).delete_server(*args, **kwargs) + return super().delete_server(*args, **kwargs) class FakeDeleteImageFailCloud(FakeOpenStackCloud): @@ -404,31 +424,23 @@ class FakeDeleteImageFailCloud(FakeOpenStackCloud): self).delete_image(*args, **kwargs) -class FakeProvider(OpenStackProvider): +class FakeAdapter(OpenStackAdapter): fake_cloud = FakeOpenStackCloud - def __init__(self, provider): + def __init__(self, provider_config): self.createServer_fails = 0 self.createServer_fails_with_external_id = 0 - self.__client = FakeProvider.fake_cloud() - super(FakeProvider, self).__init__(provider) + self.__client = FakeAdapter.fake_cloud() + super().__init__(provider_config) def _getClient(self): return self.__client - def createServer(self, *args, **kwargs): + def _createServer(self, *args, **kwargs): while self.createServer_fails: self.createServer_fails -= 1 raise Exception("Expected createServer exception") while self.createServer_fails_with_external_id: self.createServer_fails_with_external_id -= 1 raise OpenStackCloudCreateException('server', 'fakeid') - return super(FakeProvider, self).createServer(*args, **kwargs) - - def getRequestHandler(self, poolworker, request): - return FakeNodeRequestHandler(poolworker, request) - - def start(self, zk_conn): - if self.provider.region_name == 'broken-region': - raise Exception("Broken cloud config") - super().start(zk_conn) + return super()._createServer(*args, **kwargs) diff --git a/nodepool/driver/fake/handler.py b/nodepool/driver/fake/handler.py deleted file mode 100644 index dbd949c55..000000000 --- a/nodepool/driver/fake/handler.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2017 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler - - -class FakeNodeRequestHandler(OpenStackNodeRequestHandler): - launcher_id = "Fake" diff --git a/nodepool/driver/gce/adapter.py b/nodepool/driver/gce/adapter.py index be1ffca73..268d755f0 100644 --- a/nodepool/driver/gce/adapter.py +++ b/nodepool/driver/gce/adapter.py @@ -162,7 +162,7 @@ class GceAdapter(statemachine.Adapter): self.provider.rate) def getCreateStateMachine(self, hostname, label, image_external_id, - metadata, retries, request, log): + metadata, retries, request, az, log): return GceCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/gce/config.py b/nodepool/driver/gce/config.py index 0f025a557..c5817b55e 100644 --- a/nodepool/driver/gce/config.py +++ b/nodepool/driver/gce/config.py @@ -61,6 +61,7 @@ class GceLabel(ConfigValue): self.volume_type = label.get('volume-type', 'pd-standard') self.volume_size = label.get('volume-size', '10') self.diskimage = None + self.host_key_checking = self.pool.host_key_checking class GcePool(ConfigPool): diff --git a/nodepool/driver/ibmvpc/adapter.py b/nodepool/driver/ibmvpc/adapter.py index 404a3d18d..f526ba08d 100644 --- a/nodepool/driver/ibmvpc/adapter.py +++ b/nodepool/driver/ibmvpc/adapter.py @@ -381,7 +381,7 @@ class IBMVPCAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return IBMVPCCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) diff --git a/nodepool/driver/ibmvpc/config.py b/nodepool/driver/ibmvpc/config.py index c624cccfb..57dbfbb87 100644 --- a/nodepool/driver/ibmvpc/config.py +++ b/nodepool/driver/ibmvpc/config.py @@ -162,6 +162,7 @@ class IBMVPCLabel(ConfigValue): self.profile = label['profile'] self.user_data = label.get('user-data', None) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/metastatic/adapter.py b/nodepool/driver/metastatic/adapter.py index 558ec7a61..b99a29b38 100644 --- a/nodepool/driver/metastatic/adapter.py +++ b/nodepool/driver/metastatic/adapter.py @@ -277,7 +277,7 @@ class MetastaticAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return MetastaticCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) diff --git a/nodepool/driver/metastatic/config.py b/nodepool/driver/metastatic/config.py index 443db4d84..f4863a71f 100644 --- a/nodepool/driver/metastatic/config.py +++ b/nodepool/driver/metastatic/config.py @@ -45,6 +45,7 @@ class MetastaticLabel(ConfigValue): self.cloud_image = MetastaticCloudImage() self.max_parallel_jobs = label.get('max-parallel-jobs', 1) self.grace_time = label.get('grace-time', 60) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/openstack/__init__.py b/nodepool/driver/openstack/__init__.py index add696f52..178d7a09e 100644 --- a/nodepool/driver/openstack/__init__.py +++ b/nodepool/driver/openstack/__init__.py @@ -1,34 +1,34 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2022 Acme Gating, LLC # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from openstack.config import loader -from nodepool.driver import Driver +from nodepool.driver.statemachine import StateMachineDriver from nodepool.driver.openstack.config import OpenStackProviderConfig -from nodepool.driver.openstack.provider import OpenStackProvider +from nodepool.driver.openstack.adapter import OpenStackAdapter -class OpenStackDriver(Driver): - def __init__(self): - super().__init__() - self.reset() - +class OpenStackDriver(StateMachineDriver): def reset(self): self.openstack_config = loader.OpenStackConfig() + def __init__(self): + self.reset() + super().__init__() + def getProviderConfig(self, provider): return OpenStackProviderConfig(self, provider) - def getProvider(self, provider_config): - return OpenStackProvider(provider_config) + def getAdapter(self, provider_config): + return OpenStackAdapter(provider_config) diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py new file mode 100644 index 000000000..2629ef3fe --- /dev/null +++ b/nodepool/driver/openstack/adapter.py @@ -0,0 +1,801 @@ +# Copyright (C) 2011-2013 OpenStack Foundation +# Copyright 2017 Red Hat +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import ThreadPoolExecutor +import functools +import logging +import time +import operator + +import cachetools.func +import openstack + +from nodepool.driver.utils import QuotaInformation +from nodepool.driver import statemachine +from nodepool import exceptions +from nodepool import stats +from nodepool import version + +CACHE_TTL = 10 + + +class OpenStackInstance(statemachine.Instance): + def __init__(self, provider, server, quota): + super().__init__() + self.external_id = server.id + self.metadata = server.metadata + self.private_ipv4 = server.private_v4 + self.private_ipv6 = None + self.public_ipv4 = server.public_v4 + self.public_ipv6 = server.public_v6 + self.host_id = server.host_id + self.cloud = provider.cloud_config.name + self.region = provider.region_name + self.az = server.location.zone + + self.interface_ip = server.interface_ip + self.quota = quota + + def getQuotaInformation(self): + return self.quota + + +class OpenStackResource(statemachine.Resource): + def __init__(self, metadata, type, id): + super().__init__(metadata) + self.type = type + self.id = id + + +class OpenStackDeleteStateMachine(statemachine.StateMachine): + FLOATING_IP_DELETING = 'deleting floating ip' + SERVER_DELETE = 'delete server' + SERVER_DELETING = 'deleting server' + COMPLETE = 'complete' + + def __init__(self, adapter, external_id, log): + self.log = log + super().__init__() + self.adapter = adapter + self.external_id = external_id + self.floating_ips = None + + def advance(self): + if self.state == self.START: + self.server = self.adapter._getServer(self.external_id) + if (self.server and + self.adapter._hasFloatingIps() and + self.server.addresses): + self.floating_ips = self.adapter._getFloatingIps(self.server) + for fip in self.floating_ips: + self.adapter._deleteFloatingIp(fip) + self.state = self.FLOATING_IP_DELETING + if not self.floating_ips: + self.state = self.SERVER_DELETE + + if self.state == self.FLOATING_IP_DELETING: + fips = [] + for fip in self.floating_ips: + fip = self.adapter._refreshFloatingIpDelete(fip) + if not fip or fip['status'] == 'DOWN': + fip = None + if fip: + fips.append(fip) + self.floating_ips = fips + if self.floating_ips: + return + else: + self.state = self.SERVER_DELETE + + if self.state == self.SERVER_DELETE: + self.adapter._deleteServer(self.external_id) + self.state = self.SERVER_DELETING + + if self.state == self.SERVER_DELETING: + self.server = self.adapter._refreshServerDelete(self.server) + if self.server: + return + else: + self.state = self.COMPLETE + + if self.state == self.COMPLETE: + self.complete = True + + +class OpenStackCreateStateMachine(statemachine.StateMachine): + SERVER_CREATING_SUBMIT = 'submit creating server' + SERVER_CREATING = 'creating server' + SERVER_RETRY = 'retrying server creation' + SERVER_RETRY_DELETING = 'deleting server for retry' + FLOATING_IP_CREATING = 'creating floating ip' + FLOATING_IP_ATTACHING = 'attaching floating ip' + COMPLETE = 'complete' + + def __init__(self, adapter, hostname, label, image_external_id, + metadata, retries, request, az, log): + self.log = log + super().__init__() + self.adapter = adapter + self.provider = adapter.provider + self.retries = retries + self.attempts = 0 + self.label = label + self.server = None + self.hostname = hostname + self.request = request + self.az = az + + if image_external_id: + self.image_external = image_external_id + diskimage = self.provider.diskimages[label.diskimage.name] + self.config_drive = diskimage.config_drive + image_name = diskimage.name + else: + # launch using unmanaged cloud image + self.config_drive = label.cloud_image.config_drive + + if label.cloud_image.image_id: + # Using a dict with the ID bypasses an image search during + # server creation. + self.image_external = dict(id=label.cloud_image.image_id) + else: + self.image_external = label.cloud_image.external_name + image_name = label.cloud_image.name + + props = label.instance_properties.copy() + for k, v in label.dynamic_instance_properties.items(): + try: + props[k] = v.format(request=self.request.getSafeAttributes()) + except Exception: + self.log.exception( + "Error formatting dynamic instance property %s", k) + if not props: + props = None + + # Put provider.name and image_name in as groups so that ansible + # inventory can auto-create groups for us based on each of those + # qualities + # Also list each of those values directly so that non-ansible + # consumption programs don't need to play a game of knowing that + # groups[0] is the image name or anything silly like that. + groups_list = [self.provider.name] + groups_list.append(image_name) + groups_list.append(label.name) + meta = dict( + groups=",".join(groups_list), + ) + # merge in any instance properties provided from config + if props: + meta.update(props) + # merge nodepool-internal metadata + meta.update(metadata) + self.metadata = meta + self.flavor = self.adapter._findFlavor( + flavor_name=self.label.flavor_name, + min_ram=self.label.min_ram) + self.quota = QuotaInformation.construct_from_flavor(self.flavor) + self.external_id = None + + def _handleServerFault(self): + if not self.external_id: + return + try: + server = self.adapter._getServerByIdNow(self.external_id) + if not server: + return + fault = server.get('fault', {}).get('message') + if fault: + self.log.error('Detailed node error: %s', fault) + if 'quota' in fault: + self.quota_exceeded = True + except Exception: + self.log.exception( + 'Failed to retrieve node error information:') + + def advance(self): + if self.state == self.START: + self.external_id = None + self.quota_exceeded = False + self.create_future = self.adapter._submitApi( + self.adapter._createServer, + self.hostname, + image=self.image_external, + flavor=self.flavor, + key_name=self.label.key_name, + az=self.az, + config_drive=self.config_drive, + networks=self.label.networks, + security_groups=self.label.pool.security_groups, + boot_from_volume=self.label.boot_from_volume, + volume_size=self.label.volume_size, + instance_properties=self.metadata, + userdata=self.label.userdata) + self.state = self.SERVER_CREATING_SUBMIT + + if self.state == self.SERVER_CREATING_SUBMIT: + try: + try: + self.server = self.adapter._completeApi(self.create_future) + if self.server is None: + return + self.external_id = self.server.id + self.state = self.SERVER_CREATING + except openstack.cloud.exc.OpenStackCloudCreateException as e: + if e.resource_id: + self.external_id = e.resource_id + self._handleServerFault() + raise + except Exception as e: + self.log.exception("Launch attempt %d/%d failed:", + self.attempts, self.retries) + if 'quota exceeded' in str(e).lower(): + self.quota_exceeded = True + if 'number of ports exceeded' in str(e).lower(): + self.quota_exceeded = True + self.state = self.SERVER_RETRY + + if self.state == self.SERVER_CREATING: + self.server = self.adapter._refreshServer(self.server) + + if self.server.status == 'ACTIVE': + if (self.label.pool.auto_floating_ip and + self.adapter._needsFloatingIp(self.server)): + self.floating_ip = self.adapter._createFloatingIp( + self.server) + self.state = self.FLOATING_IP_CREATING + else: + self.state = self.COMPLETE + elif self.server.status == 'ERROR': + if ('fault' in self.server and self.server['fault'] is not None + and 'message' in self.server['fault']): + self.log.error( + "Error in creating the server." + " Compute service reports fault: {reason}".format( + reason=self.server['fault']['message'])) + if self.external_id: + try: + self.server = self.adapter._deleteServer( + self.external_id) + except Exception: + self.log.exception("Error deleting server:") + self.server = None + else: + self.server = None + self.state = self.SERVER_RETRY + else: + return + + if self.state == self.SERVER_RETRY: + if self.external_id: + try: + self.server = self.adapter._deleteServer(self.external_id) + except Exception: + self.log.exception("Error deleting server:") + # We must keep trying the delete until timeout in + # order to avoid having two servers for the same + # node id. + return + else: + self.server = None + self.state = self.SERVER_RETRY_DELETING + + if self.state == self.SERVER_RETRY_DELETING: + self.server = self.adapter._refreshServerDelete(self.server) + if self.server: + return + self.attempts += 1 + if self.attempts >= self.retries: + raise Exception("Too many retries") + if self.quota_exceeded: + # A quota exception is not directly recoverable so bail + # out immediately with a specific exception. + self.log.info("Quota exceeded, invalidating quota cache") + raise exceptions.QuotaException("Quota exceeded") + self.state = self.START + return + + if self.state == self.FLOATING_IP_CREATING: + self.floating_ip = self.adapter._refreshFloatingIp( + self.floating_ip) + if self.floating_ip.get('port_id', None): + if self.floating_ip['status'] == 'ACTIVE': + self.state = self.FLOATING_IP_ATTACHING + else: + return + else: + self.adapter._attachIpToServer(self.server, self.floating_ip) + self.state = self.FLOATING_IP_ATTACHING + + if self.state == self.FLOATING_IP_ATTACHING: + self.server = self.adapter._refreshServer(self.server) + ext_ip = openstack.cloud.meta.get_server_ip( + self.server, ext_tag='floating', public=True) + if ext_ip == self.floating_ip['floating_ip_address']: + self.state = self.COMPLETE + else: + return + + if self.state == self.COMPLETE: + self.complete = True + return OpenStackInstance( + self.adapter.provider, self.server, self.quota) + + +class OpenStackAdapter(statemachine.Adapter): + # If we fail to find an image specified by the config, invalidate + # the image cache after this interval: + IMAGE_CHECK_TIMEOUT = 300 + + def __init__(self, provider_config): + # Wrap these instance methods with a per-instance LRU cache so + # that we don't leak memory over time when the adapter is + # occasionally replaced. + self._findImage = functools.lru_cache(maxsize=None)( + self._findImage) + self._listFlavors = functools.lru_cache(maxsize=None)( + self._listFlavors) + self._findNetwork = functools.lru_cache(maxsize=None)( + self._findNetwork) + self._listAZs = functools.lru_cache(maxsize=None)( + self._listAZs) + + self.log = logging.getLogger( + f"nodepool.OpenStackAdapter.{provider_config.name}") + self.provider = provider_config + + workers = 8 + self.log.info("Create executor with max workers=%s", workers) + self.api_executor = ThreadPoolExecutor( + thread_name_prefix=f'openstack-api-{provider_config.name}', + max_workers=workers) + + self._last_image_check_failure = time.time() + self._last_port_cleanup = None + self._statsd = stats.get_client() + self._client = self._getClient() + + def stop(self): + self.api_executor.shutdown() + + def getCreateStateMachine(self, hostname, label, image_external_id, + metadata, retries, request, az, log): + return OpenStackCreateStateMachine( + self, hostname, label, image_external_id, + metadata, retries, request, az, log) + + def getDeleteStateMachine(self, external_id, log): + return OpenStackDeleteStateMachine(self, external_id, log) + + def listResources(self): + for server in self._listServers(): + if server.status.lower() == 'deleted': + continue + yield OpenStackResource(server.metadata, + 'server', server.id) + # Floating IP and port leakage can't be handled by the + # automatic resource cleanup in cleanupLeakedResources because + # openstack doesn't store metadata on those objects, so we + # call internal cleanup methods here. + if self.provider.port_cleanup_interval: + self._cleanupLeakedPorts() + if self.provider.clean_floating_ips: + self._cleanupFloatingIps() + + def deleteResource(self, resource): + self.log.info(f"Deleting leaked {resource.type}: {resource.id}") + if resource.type == 'server': + self._deleteServer(resource.id) + + def listInstances(self): + for server in self._listServers(): + if server.status.lower() == 'deleted': + continue + flavor = self._getFlavorFromServer(server) + quota = QuotaInformation.construct_from_flavor(flavor) + yield OpenStackInstance(self.provider, server, quota) + + def getQuotaLimits(self): + limits = self._client.get_compute_limits() + return QuotaInformation.construct_from_limits(limits) + + def getQuotaForLabel(self, label): + flavor = self._findFlavor(label.flavor_name, label.min_ram) + return QuotaInformation.construct_from_flavor(flavor) + + def getAZs(self): + azs = self._listAZs() + if not azs: + # If there are no zones, return a list containing None so that + # random.choice can pick None and pass that to Nova. If this + # feels dirty, please direct your ire to policy.json and the + # ability to turn off random portions of the OpenStack API. + return [None] + return azs + + def labelReady(self, label): + if not label.cloud_image: + return False + + # If an image ID was supplied, we'll assume it is ready since + # we don't currently have a way of validating that (except during + # server creation). + if label.cloud_image.image_id: + return True + + image = self._findImage(label.cloud_image.external_name) + if not image: + self.log.warning( + "Provider %s is configured to use %s as the" + " cloud-image for label %s and that" + " cloud-image could not be found in the" + " cloud." % (self.provider.name, + label.cloud_image.external_name, + label.name)) + # If the user insists there should be an image but it + # isn't in our cache, invalidate the cache periodically so + # that we can see new cloud image uploads. + if (time.time() - self._last_image_check_failure > + self.IMAGE_CHECK_TIMEOUT): + self._findImage.cache_clear() + self._last_image_check_failure = time.time() + return False + return True + + def uploadImage(self, provider_image, image_name, filename, + image_format, metadata, md5, sha256): + # configure glance and upload image. Note the meta flags + # are provided as custom glance properties + # NOTE: we have wait=True set here. This is not how we normally + # do things in nodepool, preferring to poll ourselves thankyouverymuch. + # However - two things to note: + # - PUT has no aysnc mechanism, so we have to handle it anyway + # - v2 w/task waiting is very strange and complex - but we have to + # block for our v1 clouds anyway, so we might as well + # have the interface be the same and treat faking-out + # a openstacksdk-level fake-async interface later + if not metadata: + metadata = {} + if image_format: + metadata['disk_format'] = image_format + image = self._client.create_image( + name=image_name, + filename=filename, + is_public=False, + wait=True, + md5=md5, + sha256=sha256, + **metadata) + return image.id + + def deleteImage(self, external_id): + self.log.debug(f"Deleting image {external_id}") + return self._client.delete_image(external_id) + + # Local implementation + + def _getClient(self): + rate_limit = None + # nodepool tracks rate limit in time between requests. + # openstacksdk tracks rate limit in requests per second. + # 1/time = requests-per-second. + if self.provider.rate: + rate_limit = 1 / self.provider.rate + return openstack.connection.Connection( + config=self.provider.cloud_config, + use_direct_get=False, + rate_limit=rate_limit, + app_name='nodepool', + app_version=version.version_info.version_string() + ) + + def _submitApi(self, api, *args, **kw): + return self.api_executor.submit( + api, *args, **kw) + + def _completeApi(self, future): + if not future.done(): + return None + return future.result() + + def _createServer(self, name, image, flavor, + az=None, key_name=None, config_drive=True, + networks=None, security_groups=None, + boot_from_volume=False, volume_size=50, + instance_properties=None, userdata=None): + if not networks: + networks = [] + if not isinstance(image, dict): + # if it's a dict, we already have the cloud id. If it's not, + # we don't know if it's name or ID so need to look it up + image = self._findImage(image) + create_args = dict(name=name, + image=image, + flavor=flavor, + config_drive=config_drive) + if boot_from_volume: + create_args['boot_from_volume'] = boot_from_volume + create_args['volume_size'] = volume_size + # NOTE(pabelanger): Always cleanup volumes when we delete a server. + create_args['terminate_volume'] = True + if key_name: + create_args['key_name'] = key_name + if az: + create_args['availability_zone'] = az + if security_groups: + create_args['security_groups'] = security_groups + if userdata: + create_args['userdata'] = userdata + nics = [] + for network in networks: + net_id = self._findNetwork(network)['id'] + nics.append({'net-id': net_id}) + if nics: + create_args['nics'] = nics + if instance_properties: + create_args['meta'] = instance_properties + + try: + return self._client.create_server(wait=False, **create_args) + except openstack.exceptions.BadRequestException: + # We've gotten a 400 error from nova - which means the request + # was malformed. The most likely cause of that, unless something + # became functionally and systemically broken, is stale az, image + # or flavor cache. Log a message, invalidate the caches so that + # next time we get new caches. + self.log.info( + "Clearing az, flavor and image caches due to 400 error " + "from nova") + self._findImage.cache_clear() + self._listFlavors.cache_clear() + self._findNetwork.cache_clear() + self._listAZs.cache_clear() + raise + + # This method is wrapped with an LRU cache in the constructor. + def _listAZs(self): + return self._client.list_availability_zone_names() + + # This method is wrapped with an LRU cache in the constructor. + def _findImage(self, name): + return self._client.get_image(name, filters={'status': 'active'}) + + # This method is wrapped with an LRU cache in the constructor. + def _listFlavors(self): + return self._client.list_flavors(get_extra=False) + + # This method is only used by the nodepool alien-image-list + # command and only works with the openstack driver. + def _listImages(self): + return self._client.list_images() + + def _getFlavors(self): + flavors = self._listFlavors() + flavors.sort(key=operator.itemgetter('ram')) + return flavors + + def _findFlavorByName(self, flavor_name): + for f in self._getFlavors(): + if flavor_name in (f['name'], f['id']): + return f + raise Exception("Unable to find flavor: %s" % flavor_name) + + def _findFlavorByRam(self, min_ram, flavor_name): + for f in self._getFlavors(): + if (f['ram'] >= min_ram + and (not flavor_name or flavor_name in f['name'])): + return f + raise Exception("Unable to find flavor with min ram: %s" % min_ram) + + def _findFlavorById(self, flavor_id): + for f in self._getFlavors(): + if f['id'] == flavor_id: + return f + raise Exception("Unable to find flavor with id: %s" % flavor_id) + + def _findFlavor(self, flavor_name, min_ram): + if min_ram: + return self._findFlavorByRam(min_ram, flavor_name) + else: + return self._findFlavorByName(flavor_name) + + # This method is wrapped with an LRU cache in the constructor. + def _findNetwork(self, name): + network = self._client.get_network(name) + if not network: + raise Exception("Unable to find network %s in provider %s" % ( + name, self.provider.name)) + return network + + @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) + def _listServers(self): + return self._client.list_servers() + + @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) + def _listFloatingIps(self): + return self._client.list_floating_ips() + + def _refreshServer(self, obj): + for server in self._listServers(): + if server.id == obj.id: + return server + return obj + + def _getServer(self, external_id): + for server in self._listServers(): + if server.id == external_id: + return server + return None + + def _getServerByIdNow(self, server_id): + # A synchronous get server by id. Only to be used in error + # handling where we can't wait for the list to update. + return self._client.get_server_by_id(server_id) + + def _refreshServerDelete(self, obj): + if obj is None: + return obj + for server in self._listServers(): + if server.id == obj.id: + if server.status.lower() == 'deleted': + return None + return server + return None + + def _refreshFloatingIp(self, obj): + for fip in self._listFloatingIps(): + if fip.id == obj.id: + return fip + return obj + + def _refreshFloatingIpDelete(self, obj): + if obj is None: + return obj + for fip in self._listFloatingIps(): + if fip.id == obj.id: + if fip.status == 'DOWN': + return None + return fip + return obj + + def _needsFloatingIp(self, server): + return self._client._needs_floating_ip( + server=server, nat_destination=None) + + def _createFloatingIp(self, server): + return self._client.create_floating_ip(server=server, wait=True) + + def _attachIpToServer(self, server, fip): + # skip_attach is ignored for nova, which is the only time we + # should actually call this method. + return self._client._attach_ip_to_server( + server=server, floating_ip=fip, + skip_attach=True) + + def _hasFloatingIps(self): + return self._client._has_floating_ips() + + def _getFloatingIps(self, server): + fips = openstack.cloud.meta.find_nova_interfaces( + server['addresses'], ext_tag='floating') + fips = [self._client.get_floating_ip( + id=None, filters={'floating_ip_address': fip['addr']}) + for fip in fips] + return fips + + def _deleteFloatingIp(self, fip): + self._client.delete_floating_ip(fip['id'], retry=0) + + def _deleteServer(self, external_id): + self._client.delete_server(external_id) + + def _getFlavorFromServer(self, server): + # In earlier versions of nova or the sdk, flavor has just an id. + # In later versions it returns the information we're looking for. + # If we get the information we want, we do not need to try to + # lookup the flavor in our list. + if hasattr(server.flavor, 'vcpus'): + return server.flavor + else: + return self._findFlavorById(server.flavor.id) + + # The port cleanup logic. We don't get tags or metadata, so we + # have to figure this out on our own. + + # This method is not cached + def _listPorts(self, status=None): + ''' + List known ports. + + :param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'. + ''' + if status: + ports = self._client.list_ports(filters={'status': status}) + else: + ports = self._client.list_ports() + return ports + + def _filterComputePorts(self, ports): + ''' + Return a list of compute ports (or no device owner). + + We are not interested in ports for routers or DHCP. + ''' + ret = [] + for p in ports: + if (p.device_owner is None or p.device_owner == '' or + p.device_owner.startswith("compute:")): + ret.append(p) + return ret + + def _cleanupLeakedPorts(self): + if not self._last_port_cleanup: + self._last_port_cleanup = time.monotonic() + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + self._down_ports = set([p.id for p in ports]) + return + + # Return if not enough time has passed between cleanup + last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) + if last_check_in_secs <= self.provider.port_cleanup_interval: + return + + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + current_set = set([p.id for p in ports]) + remove_set = current_set & self._down_ports + + removed_count = 0 + for port_id in remove_set: + try: + self._deletePort(port_id) + except Exception: + self.log.exception("Exception deleting port %s in %s:", + port_id, self.provider.name) + else: + removed_count += 1 + self.log.debug("Removed DOWN port %s in %s", + port_id, self.provider.name) + + if self._statsd and removed_count: + key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) + self._statsd.incr(key, removed_count) + + self._last_port_cleanup = time.monotonic() + + # Rely on OpenStack to tell us the down ports rather than doing our + # own set adjustment. + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + self._down_ports = set([p.id for p in ports]) + + def _deletePort(self, port_id): + self._client.delete_port(port_id) + + def _cleanupFloatingIps(self): + did_clean = self._client.delete_unattached_floating_ips() + if did_clean: + # some openstacksdk's return True if any port was + # cleaned, rather than the count. Just set it to 1 to + # indicate something happened. + if type(did_clean) == bool: + did_clean = 1 + if self._statsd: + key = ('nodepool.provider.%s.leaked.floatingips' + % self.provider.name) + self._statsd.incr(key, did_clean) diff --git a/nodepool/driver/openstack/config.py b/nodepool/driver/openstack/config.py index 6c39a6489..7d63794a7 100644 --- a/nodepool/driver/openstack/config.py +++ b/nodepool/driver/openstack/config.py @@ -29,11 +29,11 @@ class ProviderDiskImage(ConfigValue): self.config_drive = None self.connection_type = None self.connection_port = None + self.username = None + self.python_path = None + self.shell_type = None self.meta = None - def __repr__(self): - return "" % self.name - class ProviderCloudImage(ConfigValue): def __init__(self): @@ -47,9 +47,6 @@ class ProviderCloudImage(ConfigValue): self.connection_type = None self.connection_port = None - def __repr__(self): - return "" % self.name - @property def external_name(self): '''Human readable version of external.''' @@ -76,28 +73,6 @@ class ProviderLabel(ConfigValue): # The ProviderPool object that owns this label. self.pool = None - def __eq__(self, other): - if isinstance(other, ProviderLabel): - # NOTE(Shrews): We intentionally do not compare 'pool' here - # since this causes recursive checks with ProviderPool. - return (other.diskimage == self.diskimage and - other.cloud_image == self.cloud_image and - other.min_ram == self.min_ram and - other.flavor_name == self.flavor_name and - other.key_name == self.key_name and - other.name == self.name and - other.console_log == self.console_log and - other.boot_from_volume == self.boot_from_volume and - other.volume_size == self.volume_size and - other.instance_properties == self.instance_properties and - other.userdata == self.userdata and - other.networks == self.networks and - other.host_key_checking == self.host_key_checking) - return False - - def __repr__(self): - return "" % self.name - class ProviderPool(ConfigPool): ignore_equality = ['provider'] @@ -112,6 +87,7 @@ class ProviderPool(ConfigPool): self.security_groups = None self.auto_floating_ip = True self.host_key_checking = True + self.use_internal_ip = False self.labels = None # The OpenStackProviderConfig object that owns this pool. self.provider = None @@ -119,9 +95,6 @@ class ProviderPool(ConfigPool): # Initialize base class attributes super().__init__() - def __repr__(self): - return "" % self.name - def load(self, pool_config, full_config, provider): ''' Load pool configuration options. @@ -259,6 +232,9 @@ class OpenStackProviderConfig(ProviderConfig): diskimage.image_types.add(self.image_type) i.pause = bool(image.get('pause', False)) i.config_drive = image.get('config-drive', None) + i.username = diskimage.username + i.python_path = diskimage.python_path + i.shell_type = diskimage.shell_type i.connection_type = image.get('connection-type', 'ssh') i.connection_port = image.get( 'connection-port', diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py deleted file mode 100644 index fccb0cd01..000000000 --- a/nodepool/driver/openstack/handler.py +++ /dev/null @@ -1,477 +0,0 @@ -# Copyright (C) 2011-2014 OpenStack Foundation -# Copyright 2017 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import math -import pprint -import random - -from kazoo import exceptions as kze -import openstack - -from nodepool import exceptions -from nodepool import nodeutils as utils -from nodepool.zk import zookeeper as zk -from nodepool.driver.utils import NodeLauncher, QuotaInformation -from nodepool.driver import NodeRequestHandler - - -class OpenStackNodeLauncher(NodeLauncher): - def __init__(self, handler, node, provider_config, provider_label, - request): - ''' - Initialize the launcher. - - :param OpenStackNodeRequestHandler handler: The handler object. - :param Node node: A Node object describing the node to launch. - :param ProviderConfig provider_config: A ProviderConfig object - describing the provider launching this node. - :param ProviderLabel provider_label: A ProviderLabel object - describing the label to use for the node. - :param NodeRequest request: The NodeRequest that prompted the - launch. - ''' - super().__init__(handler, node, provider_config) - - # Number of times to retry failed launches. - self._retries = provider_config.launch_retries - - self.label = provider_label - self.pool = provider_label.pool - self.request = request - - def _logConsole(self, server_id, hostname): - if not self.label.console_log: - return - console = self.handler.manager.getServerConsole(server_id) - if console: - self.log.info('Console log from hostname %s:' % hostname) - for line in console.splitlines(): - self.log.info(line.rstrip()) - - def _launchNode(self): - if self.label.diskimage: - diskimage = self.provider_config.diskimages[ - self.label.diskimage.name] - else: - diskimage = None - - if diskimage: - # launch using diskimage - cloud_image = self.handler.zk.getMostRecentImageUpload( - diskimage.name, self.provider_config.name) - - if not cloud_image: - raise exceptions.LaunchNodepoolException( - "Unable to find current cloud image %s in %s" % - (diskimage.name, self.provider_config.name) - ) - - config_drive = diskimage.config_drive - # Using a dict with the ID bypasses an image search during - # server creation. - image_external = dict(id=cloud_image.external_id) - image_id = "{path}/{upload_id}".format( - path=self.handler.zk._imageUploadPath( - cloud_image.image_name, - cloud_image.build_id, - cloud_image.provider_name), - upload_id=cloud_image.id) - image_name = diskimage.name - username = cloud_image.username - python_path = cloud_image.python_path - shell_type = cloud_image.shell_type - connection_type = diskimage.connection_type - connection_port = diskimage.connection_port - - else: - # launch using unmanaged cloud image - config_drive = self.label.cloud_image.config_drive - - if self.label.cloud_image.image_id: - # Using a dict with the ID bypasses an image search during - # server creation. - image_external = dict(id=self.label.cloud_image.image_id) - else: - image_external = self.label.cloud_image.external_name - - image_id = self.label.cloud_image.name - image_name = self.label.cloud_image.name - username = self.label.cloud_image.username - python_path = self.label.cloud_image.python_path - shell_type = self.label.cloud_image.shell_type - connection_type = self.label.cloud_image.connection_type - connection_port = self.label.cloud_image.connection_port - - hostname = self.provider_config.hostname_format.format( - label=self.label, provider=self.provider_config, node=self.node - ) - - self.log.info( - "Creating server with hostname %s in %s from image %s" % ( - hostname, self.provider_config.name, image_name)) - - # NOTE: We store the node ID in the server metadata to use for leaked - # instance detection. We cannot use the external server ID for this - # because that isn't available in ZooKeeper until after the server is - # active, which could cause a race in leak detection. - - props = self.label.instance_properties.copy() - for k, v in self.label.dynamic_instance_properties.items(): - try: - props[k] = v.format(request=self.request.getSafeAttributes()) - except Exception: - self.log.exception( - "Error formatting dynamic instance property %s", k) - if not props: - props = None - - try: - server = self.handler.manager.createServer( - hostname, - image=image_external, - min_ram=self.label.min_ram, - flavor_name=self.label.flavor_name, - key_name=self.label.key_name, - az=self.node.az, - config_drive=config_drive, - nodepool_node_id=self.node.id, - nodepool_node_label=self.node.type[0], - nodepool_image_name=image_name, - nodepool_pool_name=self.node.pool, - networks=self.label.networks, - security_groups=self.pool.security_groups, - boot_from_volume=self.label.boot_from_volume, - volume_size=self.label.volume_size, - instance_properties=props, - userdata=self.label.userdata) - except openstack.cloud.exc.OpenStackCloudCreateException as e: - if e.resource_id: - self.node.external_id = e.resource_id - # The outer exception handler will handle storing the - # node immediately after this. - raise - - self.node.external_id = server.id - self.node.hostname = hostname - self.node.image_id = image_id - - pool = self.handler.provider.pools.get(self.node.pool) - resources = self.handler.manager.quotaNeededByLabel( - self.node.type[0], pool) - self.node.resources = resources.get_resources() - if username: - self.node.username = username - - self.node.python_path = python_path - self.node.shell_type = shell_type - self.node.connection_type = connection_type - self.node.connection_port = connection_port - - # Checkpoint save the updated node info - self.zk.storeNode(self.node) - - self.log.debug("Waiting for server %s" % server.id) - server = self.handler.manager.waitForServer( - server, self.provider_config.launch_timeout, - auto_ip=self.pool.auto_floating_ip) - - if server.status != 'ACTIVE': - raise exceptions.LaunchStatusException("Server %s for node id: %s " - "status: %s" % - (server.id, self.node.id, - server.status)) - - # If we didn't specify an AZ, set it to the one chosen by Nova. - # Do this after we are done waiting since AZ may not be available - # immediately after the create request. - if not self.node.az: - self.node.az = server.location.zone - - interface_ip = server.interface_ip - if not interface_ip: - self.log.debug( - "Server data for failed IP: %s" % pprint.pformat( - server)) - raise exceptions.LaunchNetworkException( - "Unable to find public IP of server") - - self.node.host_id = server.host_id - self.node.interface_ip = interface_ip - self.node.public_ipv4 = server.public_v4 - self.node.public_ipv6 = server.public_v6 - self.node.private_ipv4 = server.private_v4 - # devstack-gate multi-node depends on private_v4 being populated - # with something. On clouds that don't have a private address, use - # the public. - if not self.node.private_ipv4: - self.node.private_ipv4 = server.public_v4 - - # Checkpoint save the updated node info - self.zk.storeNode(self.node) - - self.log.debug( - "Node is running [region: %s, az: %s, ip: %s ipv4: %s, " - "ipv6: %s, hostid: %s]" % - (self.node.region, self.node.az, - self.node.interface_ip, self.node.public_ipv4, - self.node.public_ipv6, self.node.host_id)) - - # wait and scan the new node and record in ZooKeeper - host_keys = [] - if self.label.host_key_checking: - try: - self.log.debug("Gathering host keys") - # only gather host keys if the connection type is ssh or - # network_cli - gather_host_keys = ( - connection_type == 'ssh' or - connection_type == 'network_cli') - host_keys = utils.nodescan( - interface_ip, - timeout=self.provider_config.boot_timeout, - gather_hostkeys=gather_host_keys, - port=connection_port) - - if gather_host_keys and not host_keys: - raise exceptions.LaunchKeyscanException( - "Unable to gather host keys") - except exceptions.ConnectionTimeoutException: - self._logConsole(self.node.external_id, self.node.hostname) - raise - - self.node.host_keys = host_keys - self.zk.storeNode(self.node) - - def launch(self): - attempts = 1 - while attempts <= self._retries: - try: - self._launchNode() - break - except kze.SessionExpiredError: - # If we lost our ZooKeeper session, we've lost our node lock - # so there's no need to continue. - raise - except Exception as e: - if attempts <= self._retries: - self.log.exception("Launch attempt %d/%d failed:", - attempts, self._retries) - - # If we got an external id we need to fetch the server info - # again in order to retrieve the fault reason as this is not - # included in the server object we already have. - quota_exceeded = False - if self.node.external_id: - try: - server = self.handler.manager.getServerById( - self.node.external_id) or {} - fault = server.get('fault', {}).get('message') - if fault: - self.log.error('Detailed node error: %s', fault) - if 'quota' in fault: - quota_exceeded = True - except Exception: - self.log.exception( - 'Failed to retrieve node error information:') - - # If we created an instance, delete it. - if self.node.external_id: - deleting_node = zk.Node() - deleting_node.provider = self.node.provider - deleting_node.pool = self.node.pool - deleting_node.type = self.node.type - deleting_node.external_id = self.node.external_id - deleting_node.state = zk.DELETING - self.zk.storeNode(deleting_node) - self.log.info("Node %s scheduled for cleanup", - deleting_node.external_id) - self.node.external_id = None - self.node.public_ipv4 = None - self.node.public_ipv6 = None - self.node.interface_ip = None - self.zk.storeNode(self.node) - if attempts == self._retries: - raise - if 'quota exceeded' in str(e).lower(): - quota_exceeded = True - if 'number of ports exceeded' in str(e).lower(): - quota_exceeded = True - - if quota_exceeded: - # A quota exception is not directly recoverable so bail - # out immediately with a specific exception. - self.log.info("Quota exceeded, invalidating quota cache") - self.handler.manager.invalidateQuotaCache() - raise exceptions.QuotaException("Quota exceeded") - attempts += 1 - - self.node.state = zk.READY - self.zk.storeNode(self.node) - self.log.info("Node is ready") - - -class OpenStackNodeRequestHandler(NodeRequestHandler): - - def __init__(self, pw, request): - super().__init__(pw, request) - self.chosen_az = None - self._threads = [] - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.is_alive(): - count += 1 - return count - - def imagesAvailable(self): - ''' - Determines if the requested images are available for this provider. - - ZooKeeper is queried for an image uploaded to the provider that is - in the READY state. - - :returns: True if it is available, False otherwise. - ''' - if self.provider.manage_images: - for label in self.request.node_types: - if self.pool.labels[label].cloud_image: - if not self.manager.labelReady(self.pool.labels[label]): - return False - else: - if not self.zk.getMostRecentImageUpload( - self.pool.labels[label].diskimage.name, - self.provider.name): - return False - return True - - def hasRemainingQuota(self, ntype): - needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool) - - if not self.pool.ignore_provider_quota: - # Calculate remaining quota which is calculated as: - # quota = - - - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed()) - cloud_quota.subtract(needed_quota) - self.log.debug("Predicted remaining provider quota: %s", - cloud_quota) - - if not cloud_quota.non_negative(): - return False - - # Now calculate pool specific quota. Values indicating no quota default - # to math.inf representing infinity that can be calculated with. - pool_quota = QuotaInformation(cores=self.pool.max_cores, - instances=self.pool.max_servers, - ram=self.pool.max_ram, - default=math.inf) - pool_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed(self.pool)) - self.log.debug("Current pool quota: %s" % pool_quota) - pool_quota.subtract(needed_quota) - self.log.debug("Predicted remaining pool quota: %s", pool_quota) - - return pool_quota.non_negative() - - def hasProviderQuota(self, node_types): - needed_quota = QuotaInformation() - - for ntype in node_types: - needed_quota.add( - self.manager.quotaNeededByLabel(ntype, self.pool)) - - if not self.pool.ignore_provider_quota: - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract(needed_quota) - - if not cloud_quota.non_negative(): - return False - - # Now calculate pool specific quota. Values indicating no quota default - # to math.inf representing infinity that can be calculated with. - pool_quota = QuotaInformation(cores=self.pool.max_cores, - instances=self.pool.max_servers, - ram=self.pool.max_ram, - default=math.inf) - pool_quota.subtract(needed_quota) - return pool_quota.non_negative() - - def checkReusableNode(self, node): - if self.chosen_az and node.az != self.chosen_az: - return False - return True - - def nodeReusedNotification(self, node): - """ - We attempt to group the node set within the same provider availability - zone. - For this to work properly, the provider entry in the nodepool - config must list the availability zones. Otherwise, new nodes will be - put in random AZs at nova's whim. The exception being if there is an - existing node in the READY state that we can select for this node set. - Its AZ will then be used for new nodes, as well as any other READY - nodes. - """ - # If we haven't already chosen an AZ, select the - # AZ from this ready node. This will cause new nodes - # to share this AZ, as well. - if not self.chosen_az and node.az: - self.chosen_az = node.az - - def setNodeMetadata(self, node): - """ - Select grouping AZ if we didn't set AZ from a selected, - pre-existing node - """ - if not self.chosen_az: - self.chosen_az = random.choice( - self.pool.azs or self.manager.getAZs()) - node.az = self.chosen_az - node.cloud = self.provider.cloud_config.name - node.region = self.provider.region_name - - def launchesComplete(self): - ''' - Check if all launch requests have completed. - - When all of the Node objects have reached a final state (READY, FAILED - or ABORTED), we'll know all threads have finished the launch process. - ''' - if not self._threads: - return True - - # Give the NodeLaunch threads time to finish. - if self.alive_thread_count: - return False - - node_states = [node.state for node in self.nodeset] - - # NOTE: It's very important that NodeLauncher always sets one of - # these states, no matter what. - if not all(s in (zk.READY, zk.FAILED, zk.ABORTED) - for s in node_states): - return False - - return True - - def launch(self, node): - label = self.pool.labels[node.type[0]] - thd = OpenStackNodeLauncher(self, node, self.provider, label, - self.request) - thd.start() - self._threads.append(thd) diff --git a/nodepool/driver/openstack/provider.py b/nodepool/driver/openstack/provider.py deleted file mode 100755 index ed1947e0b..000000000 --- a/nodepool/driver/openstack/provider.py +++ /dev/null @@ -1,659 +0,0 @@ -# Copyright (C) 2011-2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import operator -import threading -import time - -import openstack -from openstack.exceptions import ResourceTimeout - -from nodepool import exceptions -from nodepool.driver import Provider -from nodepool.driver.utils import QuotaInformation, QuotaSupport -from nodepool.driver.utils import NodeDeleter -from nodepool import stats -from nodepool import version -from nodepool.zk import zookeeper as zk - -# Import entire module to avoid partial-loading, circular import -from nodepool.driver.openstack import handler - - -IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list - - -class OpenStackProvider(Provider, QuotaSupport): - log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider") - - def __init__(self, provider): - super().__init__() - self.provider = provider - self._images = {} - self._networks = {} - self.__flavors = {} # TODO(gtema): caching - self.__azs = None - self._zk = None - self._down_ports = set() - self._last_port_cleanup = None - self._statsd = stats.get_client() - self.running = False - self._server_list_watcher = threading.Thread( - name='ServerListWatcher', target=self._watchServerList, - daemon=True) - self._server_list_watcher_stop_event = threading.Event() - self._cleanup_queue = {} - self._startup_queue = {} - - def start(self, zk_conn): - self.resetClient() - self._zk = zk_conn - self.running = True - self._server_list_watcher.start() - - def stop(self): - self.running = False - self._server_list_watcher_stop_event.set() - - def idle(self): - pass - - def join(self): - self._server_list_watcher.join() - - def getRequestHandler(self, poolworker, request): - return handler.OpenStackNodeRequestHandler(poolworker, request) - - # TODO(gtema): caching - @property - def _flavors(self): - if not self.__flavors: - self.__flavors = self._getFlavors() - return self.__flavors - - def _getClient(self): - rate_limit = None - # nodepool tracks rate limit in time between requests. - # openstacksdk tracks rate limit in requests per second. - # 1/time = requests-per-second. - if self.provider.rate: - rate_limit = 1 / self.provider.rate - return openstack.connection.Connection( - config=self.provider.cloud_config, - use_direct_get=False, - rate_limit=rate_limit, - app_name='nodepool', - app_version=version.version_info.version_string() - ) - - def getProviderLimits(self): - limits = self._client.get_compute_limits() - return QuotaInformation.construct_from_limits(limits) - - def quotaNeededByLabel(self, ntype, pool): - provider_label = pool.labels[ntype] - - flavor = self.findFlavor(provider_label.flavor_name, - provider_label.min_ram) - - return QuotaInformation.construct_from_flavor(flavor) - - def unmanagedQuotaUsed(self): - ''' - Sums up the quota used by servers unmanaged by nodepool. - - :return: Calculated quota in use by unmanaged servers - ''' - flavors = self.listFlavorsById() - used_quota = QuotaInformation() - - node_ids = set([n.id for n in self._zk.nodeIterator()]) - - for server in self.listNodes(): - meta = server.get('metadata', {}) - - nodepool_provider_name = meta.get('nodepool_provider_name') - if (nodepool_provider_name and - nodepool_provider_name == self.provider.name): - # This provider (regardless of the launcher) owns this - # server so it must not be accounted for unmanaged - # quota; unless it has leaked. - nodepool_node_id = meta.get('nodepool_node_id') - # FIXME(tobiash): Add a test case for this - if nodepool_node_id and nodepool_node_id in node_ids: - # It has not leaked. - continue - - # In earlier versions of nova or the sdk, flavor has just an id. - # In later versions it returns the information we're looking for. - # If we get the information we want, we do not need to try to - # lookup the flavor in our list. - if hasattr(server.flavor, 'vcpus'): - flavor = server.flavor - else: - flavor = flavors.get(server.flavor.id) - # If we still haven't found the flavor, skip handling this - # server instead of failing completely - if not flavor: - continue - used_quota.add(QuotaInformation.construct_from_flavor(flavor)) - - return used_quota - - def resetClient(self): - self._client = self._getClient() - - def _getFlavors(self): - flavors = self.listFlavors() - flavors.sort(key=operator.itemgetter('ram')) - return flavors - - # TODO(gtema): These next three methods duplicate logic that is in - # openstacksdk, caching is not enabled there by default - # Remove it when caching is default - def _findFlavorByName(self, flavor_name): - for f in self._flavors: - if flavor_name in (f['name'], f['id']): - return f - raise Exception("Unable to find flavor: %s" % flavor_name) - - def _findFlavorByRam(self, min_ram, flavor_name): - for f in self._flavors: - if (f['ram'] >= min_ram - and (not flavor_name or flavor_name in f['name'])): - return f - raise Exception("Unable to find flavor with min ram: %s" % min_ram) - - def findFlavor(self, flavor_name, min_ram): - # Note: this will throw an error if the provider is offline - # but all the callers are in threads (they call in via CreateServer) so - # the mainloop won't be affected. - # TODO(gtema): enable commented block when openstacksdk has caching - # enabled by default - # if min_ram: - # return self._client.get_flavor_by_ram( - # ram=min_ram, - # include=flavor_name, - # get_extra=False) - # else: - # return self._client.get_flavor(flavor_name, get_extra=False) - - if min_ram: - return self._findFlavorByRam(min_ram, flavor_name) - else: - return self._findFlavorByName(flavor_name) - - def findImage(self, name): - if name in self._images: - return self._images[name] - - image = self._client.get_image(name, filters={'status': 'active'}) - self._images[name] = image - return image - - def findNetwork(self, name): - if name in self._networks: - return self._networks[name] - - network = self._client.get_network(name) - if not network: - raise Exception("Unable to find network %s in provider %s" % ( - name, self.provider.name)) - self._networks[name] = network - return network - - def deleteImage(self, name, id): - if name in self._images: - del self._images[name] - - return self._client.delete_image(id) - - def createServer(self, name, image, - flavor_name=None, min_ram=None, - az=None, key_name=None, config_drive=True, - nodepool_node_id=None, nodepool_node_label=None, - nodepool_image_name=None, - nodepool_pool_name=None, - networks=None, security_groups=None, - boot_from_volume=False, volume_size=50, - instance_properties=None, userdata=None): - if not networks: - networks = [] - if not isinstance(image, dict): - # if it's a dict, we already have the cloud id. If it's not, - # we don't know if it's name or ID so need to look it up - image = self.findImage(image) - flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram) - create_args = dict(name=name, - image=image, - flavor=flavor, - config_drive=config_drive) - if boot_from_volume: - create_args['boot_from_volume'] = boot_from_volume - create_args['volume_size'] = volume_size - # NOTE(pabelanger): Always cleanup volumes when we delete a server. - create_args['terminate_volume'] = True - if key_name: - create_args['key_name'] = key_name - if az: - create_args['availability_zone'] = az - if security_groups: - create_args['security_groups'] = security_groups - if userdata: - create_args['userdata'] = userdata - nics = [] - for network in networks: - net_id = self.findNetwork(network)['id'] - nics.append({'net-id': net_id}) - if nics: - create_args['nics'] = nics - # Put provider.name and image_name in as groups so that ansible - # inventory can auto-create groups for us based on each of those - # qualities - # Also list each of those values directly so that non-ansible - # consumption programs don't need to play a game of knowing that - # groups[0] is the image name or anything silly like that. - groups_list = [self.provider.name] - - if nodepool_image_name: - groups_list.append(nodepool_image_name) - if nodepool_node_label: - groups_list.append(nodepool_node_label) - meta = dict( - groups=",".join(groups_list), - nodepool_provider_name=self.provider.name, - nodepool_pool_name=nodepool_pool_name, - ) - # merge in any provided properties - if instance_properties: - meta = {**instance_properties, **meta} - if nodepool_node_id: - meta['nodepool_node_id'] = nodepool_node_id - if nodepool_image_name: - meta['nodepool_image_name'] = nodepool_image_name - if nodepool_node_label: - meta['nodepool_node_label'] = nodepool_node_label - create_args['meta'] = meta - - try: - return self._client.create_server(wait=False, **create_args) - except openstack.exceptions.BadRequestException: - # We've gotten a 400 error from nova - which means the request - # was malformed. The most likely cause of that, unless something - # became functionally and systemically broken, is stale az, image - # or flavor cache. Log a message, invalidate the caches so that - # next time we get new caches. - self._images = {} - self.__azs = None - self.__flavors = {} # TODO(gtema): caching - self.log.info( - "Clearing az, flavor and image caches due to 400 error " - "from nova") - raise - - def getServer(self, server_id): - return self._client.get_server(server_id) - - def getServerById(self, server_id): - return self._client.get_server_by_id(server_id) - - def getServerConsole(self, server_id): - try: - return self._client.get_server_console(server_id) - except openstack.exceptions.OpenStackCloudException: - return None - - def waitForServer(self, server, timeout=3600, auto_ip=True): - # This method is called from a separate thread per server. In order to - # reduce thread contention we don't call wait_for_server right now - # but put this thread on sleep until the desired instance is either - # in ACTIVE or ERROR state. After that just continue with - # wait_for_server which will continue its magic. - # TODO: log annotation - self.log.debug('Wait for central server creation %s', server.id) - event = threading.Event() - start_time = time.monotonic() - self._startup_queue[server.id] = (event, start_time + timeout) - if not event.wait(timeout=timeout): - # On timeout emit the same exception as wait_for_server would to - timeout_message = "Timeout waiting for the server to come up." - raise ResourceTimeout(timeout_message) - - # TODO: log annotation - self.log.debug('Finished wait for central server creation %s', - server.id) - - # Re-calculate timeout to account for the duration so far - elapsed = time.monotonic() - start_time - timeout = max(0, timeout - elapsed) - - return self._client.wait_for_server( - server=server, auto_ip=auto_ip, - reuse=False, timeout=timeout) - - def waitForNodeCleanup(self, server_id, timeout=600): - event = threading.Event() - self._cleanup_queue[server_id] = (event, time.monotonic() + timeout) - if not event.wait(timeout=timeout): - raise exceptions.ServerDeleteException( - "server %s deletion" % server_id) - - def createImage(self, server, image_name, meta): - return self._client.create_image_snapshot( - image_name, server, **meta) - - def getImage(self, image_id): - return self._client.get_image(image_id, filters={'status': 'active'}) - - def labelReady(self, label): - if not label.cloud_image: - return False - - # If an image ID was supplied, we'll assume it is ready since - # we don't currently have a way of validating that (except during - # server creation). - if label.cloud_image.image_id: - return True - - image = self.getImage(label.cloud_image.external_name) - if not image: - self.log.warning( - "Provider %s is configured to use %s as the" - " cloud-image for label %s and that" - " cloud-image could not be found in the" - " cloud." % (self.provider.name, - label.cloud_image.external_name, - label.name)) - return False - return True - - def uploadImage(self, provider_image, image_name, filename, - image_type=None, meta=None, md5=None, sha256=None): - # configure glance and upload image. Note the meta flags - # are provided as custom glance properties - # NOTE: we have wait=True set here. This is not how we normally - # do things in nodepool, preferring to poll ourselves thankyouverymuch. - # However - two things to note: - # - PUT has no aysnc mechanism, so we have to handle it anyway - # - v2 w/task waiting is very strange and complex - but we have to - # block for our v1 clouds anyway, so we might as well - # have the interface be the same and treat faking-out - # a openstacksdk-level fake-async interface later - if not meta: - meta = {} - if image_type: - meta['disk_format'] = image_type - image = self._client.create_image( - name=image_name, - filename=filename, - is_public=False, - wait=True, - md5=md5, - sha256=sha256, - **meta) - return image.id - - def listPorts(self, status=None): - ''' - List known ports. - - :param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'. - ''' - if status: - ports = self._client.list_ports(filters={'status': status}) - else: - ports = self._client.list_ports() - return ports - - def deletePort(self, port_id): - self._client.delete_port(port_id) - - def listImages(self): - return self._client.list_images() - - def listFlavors(self): - return self._client.list_flavors(get_extra=False) - - def listFlavorsById(self): - flavors = {} - for flavor in self._client.list_flavors(get_extra=False): - flavors[flavor.id] = flavor - return flavors - - def listNodes(self): - # list_servers carries the nodepool server list caching logic - return self._client.list_servers() - - def deleteServer(self, server_id): - return self._client.delete_server(server_id, delete_ips=True) - - def startNodeCleanup(self, node): - t = NodeDeleter(self._zk, self, node) - t.start() - return t - - def cleanupNode(self, server_id): - server = self.getServer(server_id) - if not server: - raise exceptions.NotFound() - - self.log.debug('Deleting server %s' % server_id) - self.deleteServer(server_id) - - def cleanupLeakedInstances(self): - ''' - Delete any leaked server instances. - - Remove any servers found in this provider that are not recorded in - the ZooKeeper data. - ''' - - deleting_nodes = {} - - for node in self._zk.nodeIterator(): - if node.state == zk.DELETING: - if node.provider != self.provider.name: - continue - if node.provider not in deleting_nodes: - deleting_nodes[node.provider] = [] - deleting_nodes[node.provider].append(node.external_id) - - for server in self._client.list_servers(bare=True): - meta = server.get('metadata', {}) - - if 'nodepool_provider_name' not in meta: - continue - - if meta['nodepool_provider_name'] != self.provider.name: - # Another launcher, sharing this provider but configured - # with a different name, owns this. - continue - - if (self.provider.name in deleting_nodes and - server.id in deleting_nodes[self.provider.name]): - # Already deleting this node - continue - - if not self._zk.getNode(meta['nodepool_node_id']): - self.log.warning( - "Marking for delete leaked instance %s (%s) in %s " - "(unknown node id %s)", - server.name, server.id, self.provider.name, - meta['nodepool_node_id'] - ) - # Create an artifical node to use for deleting the server. - node = zk.Node() - node.external_id = server.id - node.provider = self.provider.name - node.pool = meta.get('nodepool_pool_name') - node.state = zk.DELETING - self._zk.storeNode(node) - if self._statsd: - key = ('nodepool.provider.%s.leaked.nodes' - % self.provider.name) - self._statsd.incr(key) - - def filterComputePorts(self, ports): - ''' - Return a list of compute ports (or no device owner). - - We are not interested in ports for routers or DHCP. - ''' - ret = [] - for p in ports: - if (p.device_owner is None or p.device_owner == '' or - p.device_owner.startswith("compute:")): - ret.append(p) - return ret - - def cleanupLeakedPorts(self): - if not self._last_port_cleanup: - self._last_port_cleanup = time.monotonic() - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - self._down_ports = set([p.id for p in ports]) - return - - # Return if not enough time has passed between cleanup - last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) - if last_check_in_secs <= self.provider.port_cleanup_interval: - return - - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - current_set = set([p.id for p in ports]) - remove_set = current_set & self._down_ports - - removed_count = 0 - for port_id in remove_set: - try: - self.deletePort(port_id) - except Exception: - self.log.exception("Exception deleting port %s in %s:", - port_id, self.provider.name) - else: - removed_count += 1 - self.log.debug("Removed DOWN port %s in %s", - port_id, self.provider.name) - - if self._statsd and removed_count: - key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) - self._statsd.incr(key, removed_count) - - self._last_port_cleanup = time.monotonic() - - # Rely on OpenStack to tell us the down ports rather than doing our - # own set adjustment. - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - self._down_ports = set([p.id for p in ports]) - - def cleanupLeakedResources(self): - self.cleanupLeakedInstances() - if self.provider.port_cleanup_interval: - self.cleanupLeakedPorts() - if self.provider.clean_floating_ips: - did_clean = self._client.delete_unattached_floating_ips() - if did_clean: - # some openstacksdk's return True if any port was - # cleaned, rather than the count. Just set it to 1 to - # indicate something happened. - if type(did_clean) == bool: - did_clean = 1 - if self._statsd: - key = ('nodepool.provider.%s.leaked.floatingips' - % self.provider.name) - self._statsd.incr(key, did_clean) - - def getAZs(self): - if self.__azs is None: - self.__azs = self._client.list_availability_zone_names() - if not self.__azs: - # If there are no zones, return a list containing None so that - # random.choice can pick None and pass that to Nova. If this - # feels dirty, please direct your ire to policy.json and the - # ability to turn off random portions of the OpenStack API. - self.__azs = [None] - return self.__azs - - def _watchServerList(self): - log = logging.getLogger( - "nodepool.driver.openstack.OpenStackProvider.watcher") - while self.running: - if self._server_list_watcher_stop_event.wait(5): - # We're stopping now so don't wait with any thread for node - # deletion. - for event, _ in self._cleanup_queue.values(): - event.set() - for event, _ in self._startup_queue.values(): - event.set() - break - - if not self._cleanup_queue and not self._startup_queue: - # No server deletion to wait for so check can be skipped - continue - - try: - log.debug('Get server list') - start = time.monotonic() - # List bare to avoid neutron calls - servers = self._client.list_servers(bare=True) - log.debug('Got server list in %.3fs', time.monotonic() - start) - except Exception: - log.exception('Failed to get server list') - continue - - def process_timeouts(queue): - for server_id in list(queue.keys()): - # Remove entries that are beyond timeout - _, timeout = queue[server_id] - if time.monotonic() > timeout: - del queue[server_id] - - # Process cleanup queue - existing_server_ids = { - server.id for server in servers - if server.status != 'DELETED' - } - for server_id in list(self._cleanup_queue.keys()): - # Notify waiting threads that don't have server ids - if server_id not in existing_server_ids: - # Notify the thread which is waiting for the delete - log.debug('Waking up cleanup thread for server %s', - server_id) - self._cleanup_queue[server_id][0].set() - del self._cleanup_queue[server_id] - - # Process startup queue - finished_server_ids = { - server.id for server in servers - if server.status in ('ACTIVE', 'ERROR') - } - for server_id in list(self._startup_queue.keys()): - # Notify waiting threads that don't have server ids - if server_id in finished_server_ids: - # Notify the thread which is waiting for the delete - log.debug('Waking up startup thread for server %s', - server_id) - self._startup_queue[server_id][0].set() - del self._startup_queue[server_id] - - # Process timeouts - process_timeouts(self._cleanup_queue) - process_timeouts(self._startup_queue) - - log.debug('Done') diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index ed5d289a9..d2b344c9f 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -17,6 +17,7 @@ import time import logging import math +import random import threading from concurrent.futures.thread import ThreadPoolExecutor @@ -137,7 +138,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): 'nodepool_provider_name': self.manager.provider.name} self.state_machine = self.manager.adapter.getCreateStateMachine( hostname, label, image_external_id, metadata, retries, - self.handler.request, self.log) + self.handler.request, self.handler.chosen_az, self.log) def updateNodeFromInstance(self, instance): if instance is None: @@ -157,6 +158,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): node.public_ipv4 = instance.public_ipv4 node.private_ipv4 = instance.private_ipv4 node.public_ipv6 = instance.public_ipv6 + node.host_id = instance.host_id node.cloud = instance.cloud node.region = instance.region node.az = instance.az @@ -205,20 +207,17 @@ class StateMachineNodeLauncher(stats.StatsReporter): instance = state_machine.advance() self.log.debug(f"State machine for {node.id} at " f"{state_machine.state}") - if not node.external_id: - if not state_machine.external_id: - raise Exception("Driver implementation error: state " - "machine must produce external ID " - "after first advancement") + if not node.external_id and state_machine.external_id: node.external_id = state_machine.external_id self.zk.storeNode(node) if state_machine.complete and not self.keyscan_future: self.updateNodeFromInstance(instance) self.log.debug("Submitting keyscan request for %s", node.interface_ip) + label = self.handler.pool.labels[self.node.type[0]] future = self.manager.keyscan_worker.submit( keyscan, - self.handler.pool.host_key_checking, + label.host_key_checking, node.id, node.interface_ip, node.connection_type, node.connection_port, self.manager.provider.boot_timeout) @@ -240,6 +239,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): node.external_id = state_machine.external_id self.zk.storeNode(node) statsd_key = 'error.quota' + self.manager.invalidateQuotaCache() except Exception as e: self.log.exception( "Launch failed for node %s:", node.id) @@ -352,6 +352,7 @@ class StateMachineHandler(NodeRequestHandler): def __init__(self, pw, request): super().__init__(pw, request) + self.chosen_az = None self.launchers = [] @property @@ -364,6 +365,15 @@ class StateMachineHandler(NodeRequestHandler): :returns: True if it is available, False otherwise. ''' + for label in self.request.node_types: + if self.pool.labels[label].cloud_image: + if not self.manager.labelReady(self.pool.labels[label]): + return False + else: + if not self.zk.getMostRecentImageUpload( + self.pool.labels[label].diskimage.name, + self.provider.name): + return False return True def hasProviderQuota(self, node_types): @@ -414,17 +424,18 @@ class StateMachineHandler(NodeRequestHandler): needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool) self.log.debug("Needed quota: %s", needed_quota) - # Calculate remaining quota which is calculated as: - # quota = - - - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed()) - cloud_quota.subtract(needed_quota) - self.log.debug("Predicted remaining provider quota: %s", - cloud_quota) + if not self.pool.ignore_provider_quota: + # Calculate remaining quota which is calculated as: + # quota = - - + cloud_quota = self.manager.estimatedNodepoolQuota() + cloud_quota.subtract( + self.manager.estimatedNodepoolQuotaUsed()) + cloud_quota.subtract(needed_quota) + self.log.debug("Predicted remaining provider quota: %s", + cloud_quota) - if not cloud_quota.non_negative(): - return False + if not cloud_quota.non_negative(): + return False # Now calculate pool specific quota. Values indicating no quota default # to math.inf representing infinity that can be calculated with. @@ -444,6 +455,37 @@ class StateMachineHandler(NodeRequestHandler): return pool_quota.non_negative() + def checkReusableNode(self, node): + if self.chosen_az and node.az != self.chosen_az: + return False + return True + + def nodeReusedNotification(self, node): + """ + We attempt to group the node set within the same provider availability + zone. + For this to work properly, the provider entry in the nodepool + config must list the availability zones. Otherwise, new node placement + will be determined by the cloud. The exception being if there is an + existing node in the READY state that we can select for this node set. + Its AZ will then be used for new nodes, as well as any other READY + nodes. + """ + # If we haven't already chosen an AZ, select the + # AZ from this ready node. This will cause new nodes + # to share this AZ, as well. + if not self.chosen_az and node.az: + self.chosen_az = node.az + + def setNodeMetadata(self, node): + """ + Select grouping AZ if we didn't set AZ from a selected, + pre-existing node + """ + if not self.chosen_az: + self.chosen_az = random.choice( + self.pool.azs or self.manager.adapter.getAZs()) + def launchesComplete(self): ''' Check if all launch requests have completed. @@ -495,9 +537,11 @@ class StateMachineProvider(Provider, QuotaSupport): super().start(zk_conn) self.running = True self._zk = zk_conn - self.keyscan_worker = ThreadPoolExecutor() + self.keyscan_worker = ThreadPoolExecutor( + thread_name_prefix=f'keyscan-{self.provider.name}') self.state_machine_thread = threading.Thread( - target=self._runStateMachines) + target=self._runStateMachines, + daemon=True) self.state_machine_thread.start() def stop(self): @@ -555,7 +599,7 @@ class StateMachineProvider(Provider, QuotaSupport): return StateMachineHandler(poolworker, request) def labelReady(self, label): - return True + return self.adapter.labelReady(label) def getProviderLimits(self): try: @@ -745,6 +789,7 @@ class Instance: * cloud: str * az: str * region: str + * host_id: str * driver_data: any * slot: int @@ -769,6 +814,7 @@ class Instance: self.cloud = None self.az = None self.region = None + self.host_id = None self.metadata = {} self.driver_data = None self.slot = None @@ -951,6 +997,36 @@ class Adapter: """ return QuotaInformation(instances=1) + def getAZs(self): + """Return a list of availability zones for this provider + + One of these will be selected at random and supplied to the + create state machine. If a request handler is building a node + set from an existing ready node, then the AZ from that node + will be used instead of the results of this method. + + :returns: A list of availability zone names. + """ + return [None] + + def labelReady(self, label): + """Indicate whether a label is ready in the provided cloud + + This is used by the launcher to determine whether it should + consider a label to be in-service for a provider. If this + returns False, the label will be ignored for this provider. + + This does not need to consider whether a diskimage is ready; + the launcher handles that itself. Instead, this can be used + to determine whether a cloud-image is available. + + :param ProviderLabel label: A config object describing a label + for an instance. + + :returns: A bool indicating whether the label is ready. + """ + return True + # The following methods must be implemented only if image # management is supported: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index f49593f87..820ed1b61 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -287,6 +287,10 @@ class BaseTestCase(testtools.TestCase): continue if t.name.startswith("ThreadPoolExecutor"): continue + if t.name.startswith("openstack-api"): + continue + if t.name.startswith("keyscan"): + continue if t.name not in whitelist: done = False if done: @@ -584,7 +588,11 @@ class DBTestCase(BaseTestCase): for _ in iterate_timeout(ONE_MINUTE, Exception, "Cloud instance deletion", interval=1): - servers = manager.listNodes() + if hasattr(manager, 'adapter'): + servers = manager.adapter._listServers() + else: + # TODO: remove once all drivers use statemachine + servers = manager.listNodes() if not (instance_id in [s.id for s in servers]): break diff --git a/nodepool/tests/fixtures/node_auto_floating_ip.yaml b/nodepool/tests/fixtures/node_auto_floating_ip.yaml index 84051ed0f..e6ddde11a 100644 --- a/nodepool/tests/fixtures/node_auto_floating_ip.yaml +++ b/nodepool/tests/fixtures/node_auto_floating_ip.yaml @@ -34,7 +34,7 @@ providers: - name: main max-servers: 96 networks: - - 'some-name' + - 'no-auto-ip-network-name' auto-floating-ip: False labels: - name: fake-label1 diff --git a/nodepool/tests/unit/test_builder.py b/nodepool/tests/unit/test_builder.py index 4566f7dfc..77d007ce3 100644 --- a/nodepool/tests/unit/test_builder.py +++ b/nodepool/tests/unit/test_builder.py @@ -20,7 +20,7 @@ import mock import time from nodepool import builder, tests -from nodepool.driver.fake import provider as fakeprovider +from nodepool.driver.fake import adapter as fakeadapter from nodepool.zk import zookeeper as zk from nodepool.config import Config from nodepool.nodeutils import iterate_timeout @@ -173,13 +173,13 @@ class TestNodePoolBuilder(tests.DBTestCase): """Test that image upload fails are handled properly.""" # Now swap out the upload fake so that the next uploads fail - fake_client = fakeprovider.FakeUploadFailCloud(times_to_fail=1) + fake_client = fakeadapter.FakeUploadFailCloud(times_to_fail=1) def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node.yaml') @@ -264,13 +264,13 @@ class TestNodePoolBuilder(tests.DBTestCase): def test_image_removal_dib_deletes_first(self): # Break cloud image deleting - fake_client = fakeprovider.FakeDeleteImageFailCloud() + fake_client = fakeadapter.FakeDeleteImageFailCloud() def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_two_image.yaml') diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index a395fabcc..4b8c79d32 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -121,7 +121,7 @@ class TestNodepoolCMD(tests.DBTestCase): def fail_list(self): raise RuntimeError('Fake list error') self.useFixture(fixtures.MonkeyPatch( - 'nodepool.driver.fake.provider.FakeOpenStackCloud.list_servers', + 'nodepool.driver.fake.adapter.FakeOpenStackCloud.list_servers', fail_list)) configfile = self.setup_config("node_cmd.yaml") diff --git a/nodepool/tests/unit/test_driver_metastatic.py b/nodepool/tests/unit/test_driver_metastatic.py index c57627873..ef2a3e92e 100644 --- a/nodepool/tests/unit/test_driver_metastatic.py +++ b/nodepool/tests/unit/test_driver_metastatic.py @@ -86,7 +86,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -151,7 +151,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -166,7 +166,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Allocate a second node, should have same backing node node2 = self._requestNode() @@ -188,7 +188,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -209,7 +209,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Delete the metastatic node and verify that backing is deleted node1.state = zk.DELETING diff --git a/nodepool/tests/unit/test_driver_static.py b/nodepool/tests/unit/test_driver_static.py index 206cacb65..b1581362a 100644 --- a/nodepool/tests/unit/test_driver_static.py +++ b/nodepool/tests/unit/test_driver_static.py @@ -498,7 +498,8 @@ class TestDriverStatic(tests.DBTestCase): self.wait_for_config(pool) manager = pool.getProviderManager('openstack-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") + manager.adapter.IMAGE_CHECK_TIMEOUT = 0 req = zk.NodeRequest() req.state = zk.REQUESTED diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index a0ea800a2..92e021ce2 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -24,7 +24,8 @@ import testtools from nodepool import tests from nodepool.zk import zookeeper as zk from nodepool.zk.components import PoolComponent -from nodepool.driver.fake import provider as fakeprovider +from nodepool.driver.statemachine import StateMachineProvider +from nodepool.driver.fake import adapter as fakeadapter from nodepool.nodeutils import iterate_timeout import nodepool.launcher from nodepool.version import get_version_string @@ -35,6 +36,12 @@ from kazoo import exceptions as kze class TestLauncher(tests.DBTestCase): log = logging.getLogger("nodepool.TestLauncher") + def setUp(self): + super().setUp() + + StateMachineProvider.MINIMUM_SLEEP = 0.1 + StateMachineProvider.MAXIMUM_SLEEP = 1 + def test_node_assignment(self): ''' Successful node launch should have unlocked nodes in READY state @@ -89,7 +96,7 @@ class TestLauncher(tests.DBTestCase): # We check the "cloud" side attributes are set from nodepool side provider = pool.getProviderManager('fake-provider') - cloud_node = provider.getServer(node.hostname) + cloud_node = provider.adapter._getServer(node.external_id) self.assertEqual( cloud_node.metadata['nodepool_provider_name'], 'fake-provider') @@ -183,7 +190,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (max_cores, max_instances, max_ram) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -196,7 +203,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() req1 = zk.NodeRequest() req1.state = zk.REQUESTED @@ -412,7 +419,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (math.inf, 1, math.inf) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -472,7 +479,7 @@ class TestLauncher(tests.DBTestCase): nonlocal max_cores, max_instances, max_ram return (max_cores, max_instances, max_ram) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -485,7 +492,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() # Wait for a single node to be created req1 = zk.NodeRequest() @@ -545,7 +552,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails = 2 + manager.adapter.createServer_fails = 2 req = zk.NodeRequest() req.state = zk.REQUESTED @@ -553,7 +560,7 @@ class TestLauncher(tests.DBTestCase): self.zk.storeNodeRequest(req) req = self.waitForNodeRequest(req) - self.assertEqual(0, manager.createServer_fails) + self.assertEqual(0, manager.adapter.createServer_fails) self.assertEqual(req.state, zk.FAILED) self.assertNotEqual(req.declined_by, []) @@ -578,7 +585,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FULFILLED) # now change the azs in the cloud - cloud = pool.getProviderManager('fake-provider')._getClient() + cloud = pool.getProviderManager('fake-provider').adapter._getClient() cloud._azs = ['new-az1', 'new-az2'] # Do a second request. This will fail because the cached azs are not @@ -661,7 +668,7 @@ class TestLauncher(tests.DBTestCase): provider = (builder._upload_workers[0]._config. provider_managers['fake-provider']) - cloud_image = provider.getImage(image.external_id) + cloud_image = provider.adapter._findImage(image.external_id) self.assertEqual( cloud_image._kw.get('diskimage_metadata'), 'diskimage') self.assertEqual( @@ -690,7 +697,7 @@ class TestLauncher(tests.DBTestCase): # We check the "cloud" side attributes are set from nodepool side provider = pool.getProviderManager('fake-provider') - cloud_node = provider.getServer(nodes[0].hostname) + cloud_node = provider.adapter._getServer(nodes[0].external_id) self.assertEqual( cloud_node.metadata['nodepool_provider_name'], 'fake-provider') @@ -927,7 +934,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(nodes[0].provider, 'fake-provider') self.assertEqual(len(nodes_def_sg), 1) self.assertEqual(nodes_def_sg[0].provider, 'fake-provider') - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() for server in client._server_list: if server.id == nodes[0].external_id: self.assertEqual(server.security_groups, ['fake-sg']) @@ -1066,7 +1073,7 @@ class TestLauncher(tests.DBTestCase): # Get fake cloud record and set status to DELETING manager = pool.getProviderManager('fake-provider') - for instance in manager.listNodes(): + for instance in manager.adapter._listServers(): if instance.id == nodes[0].external_id: instance.status = 'DELETED' break @@ -1078,7 +1085,7 @@ class TestLauncher(tests.DBTestCase): self.waitForNodeDeletion(nodes[0]) api_record_remains = False - for instance in manager.listNodes(): + for instance in manager.adapter._listServers(): if instance.id == nodes[0].external_id: api_record_remains = True break @@ -1098,7 +1105,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails = 2 + manager.adapter.createServer_fails = 2 self.waitForImage('fake-provider', 'fake-image') req = zk.NodeRequest() @@ -1110,7 +1117,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FAILED) # retries in config is set to 2, so 2 attempts to create a server - self.assertEqual(0, manager.createServer_fails) + self.assertEqual(0, manager.adapter.createServer_fails) def test_node_launch_with_broken_znodes(self): """Test that node launch still works if there are broken znodes""" @@ -1152,7 +1159,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails_with_external_id = 2 + manager.adapter.createServer_fails_with_external_id = 2 self.waitForImage('fake-provider', 'fake-image') # Stop the DeletedNodeWorker so we can make sure the fake znode that @@ -1170,7 +1177,8 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FAILED) # retries in config is set to 2, so 2 attempts to create a server - self.assertEqual(0, manager.createServer_fails_with_external_id) + self.assertEqual( + 0, manager.adapter.createServer_fails_with_external_id) # Request another node to check if nothing is wedged req = zk.NodeRequest() @@ -1186,7 +1194,7 @@ class TestLauncher(tests.DBTestCase): raise RuntimeError('Fake Error') self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, 'deleteServer', fail_delete)) + fakeadapter.FakeAdapter, '_deleteServer', fail_delete)) configfile = self.setup_config('node.yaml') pool = self.useNodepool(configfile, watermark_sleep=1) @@ -1215,10 +1223,10 @@ class TestLauncher(tests.DBTestCase): def test_node_delete_error(self): def error_delete(self, name): # Set ERROR status instead of deleting the node - self._getClient()._server_list[0].status = 'ERROR' + self._client._server_list[0].status = 'ERROR' self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, 'deleteServer', error_delete)) + fakeadapter.FakeAdapter, '_deleteServer', error_delete)) configfile = self.setup_config('node_delete_error.yaml') pool = self.useNodepool(configfile, watermark_sleep=1) @@ -1244,14 +1252,10 @@ class TestLauncher(tests.DBTestCase): # wait the cleanup thread to kick in time.sleep(5) - zk_nodes = self.zk.getNodes() - self.assertEqual(len(zk_nodes), 1) - node = self.zk.getNode(zk_nodes[0]) - self.assertEqual(node.state, zk.DELETING) - - # remove error nodes - pool.getProviderManager( - 'fake-provider')._getClient()._server_list.clear() + # Make sure it shows up as leaked + manager = pool.getProviderManager('fake-provider') + instances = list(manager.adapter.listInstances()) + self.assertEqual(1, len(instances)) def test_leaked_node(self): """Test that a leaked node is deleted""" @@ -1267,7 +1271,7 @@ class TestLauncher(tests.DBTestCase): # Make sure we have a node built and ready self.assertEqual(len(nodes), 1) manager = pool.getProviderManager('fake-provider') - servers = manager.listNodes() + servers = manager.adapter._listServers() self.assertEqual(len(servers), 1) # Delete the node from ZooKeeper, but leave the instance @@ -1286,7 +1290,7 @@ class TestLauncher(tests.DBTestCase): self.waitForInstanceDeletion(manager, nodes[0].external_id) # Make sure we end up with only one server (the replacement) - servers = manager.listNodes() + servers = manager.adapter._listServers() self.assertEqual(len(servers), 1) def test_max_ready_age(self): @@ -1599,9 +1603,10 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") - manager._client.create_image(name="fake-image-windows") - manager._client.create_image(name="fake-image-windows-port") + manager.adapter.IMAGE_CHECK_TIMEOUT = 1 + manager.adapter._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image-windows") + manager.adapter._client.create_image(name="fake-image-windows-port") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) @@ -1638,7 +1643,8 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="provider-named-image") + manager.adapter.IMAGE_CHECK_TIMEOUT = 1 + manager.adapter._client.create_image(name="provider-named-image") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) @@ -1936,7 +1942,7 @@ class TestLauncher(tests.DBTestCase): self.wait_for_config(pool) manager = pool.getProviderManager('good-provider') - manager._client.create_image(name="good-image") + manager.adapter._client.create_image(name="good-image") good_req = zk.NodeRequest() good_req.state = zk.REQUESTED @@ -2013,11 +2019,11 @@ class TestLauncher(tests.DBTestCase): time.sleep(1) launcher_pools = self.zk.getRegisteredPools() - @mock.patch('nodepool.driver.openstack.handler.' - 'OpenStackNodeLauncher._launchNode') + @mock.patch('nodepool.driver.statemachine.' + 'StateMachineNodeLauncher.launch') def test_launchNode_session_expired(self, mock_launch): ''' - Test ZK session lost during _launchNode(). + Test ZK session lost during launch(). ''' mock_launch.side_effect = kze.SessionExpiredError() @@ -2044,19 +2050,19 @@ class TestLauncher(tests.DBTestCase): while self.zk.countPoolNodes('fake-provider', 'main'): time.sleep(0) - @mock.patch('nodepool.driver.openstack.provider.' - 'OpenStackProvider.invalidateQuotaCache') + @mock.patch('nodepool.driver.statemachine.' + 'StateMachineProvider.invalidateQuotaCache') def test_launchNode_node_fault_message(self, mock_invalidatequotacache): ''' Test failed launch can get detailed node fault info if available. ''' - fake_client = fakeprovider.FakeLaunchAndGetFaultCloud() + fake_client = fakeadapter.FakeLaunchAndGetFaultCloud() def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_launch_retry.yaml') @@ -2089,20 +2095,19 @@ class TestLauncher(tests.DBTestCase): Test that the launcher keeps trying to spawn a node in case of a delete error ''' - fake_client = fakeprovider.FakeLaunchAndDeleteFailCloud( + fake_client = fakeadapter.FakeLaunchAndDeleteFailCloud( times_to_fail=1) def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_launch_retry.yaml') self.useBuilder(configfile) pool = self.useNodepool(configfile, watermark_sleep=1) - pool.cleanup_interval = 60 pool.start() self.waitForImage('fake-provider', 'fake-image') @@ -2248,7 +2253,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (0, 20, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2283,7 +2288,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (0, 20, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2380,7 +2385,7 @@ class TestLauncher(tests.DBTestCase): return (100, max_instances, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2423,7 +2428,7 @@ class TestLauncher(tests.DBTestCase): return (100, max_instances, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2565,7 +2570,7 @@ class TestLauncher(tests.DBTestCase): self.waitForNodes('fake-label') manager = pool.getProviderManager('fake-provider') - down_ports = manager.listPorts(status='DOWN') + down_ports = manager.adapter._listPorts(status='DOWN') self.assertEqual(2, len(down_ports)) self.log.debug("Down ports: %s", down_ports) @@ -2580,13 +2585,10 @@ class TestLauncher(tests.DBTestCase): except AssertionError: # config still hasn't updated, retry manager = pool.getProviderManager('fake-provider') - # Reset the client as a new fake client will have been - # created. - manager.resetClient() for _ in iterate_timeout(4, Exception, 'assert ports are cleaned'): try: - down_ports = manager.listPorts(status='DOWN') + down_ports = manager.adapter._listPorts(status='DOWN') self.assertEqual(0, len(down_ports)) break except AssertionError: diff --git a/nodepool/tests/unit/test_sdk_integration.py b/nodepool/tests/unit/test_sdk_integration.py index 5bb64a869..e7cbe9b6e 100644 --- a/nodepool/tests/unit/test_sdk_integration.py +++ b/nodepool/tests/unit/test_sdk_integration.py @@ -63,7 +63,7 @@ class TestShadeIntegration(tests.IntegrationTestCase): # thread that causes wait_for_threads in subsequent tests to fail. self.addCleanup(pm.stop) pm.start(None) - self.assertEqual(pm._client.auth, auth_data) + self.assertEqual(pm.adapter._client.auth, auth_data) def test_nodepool_occ_config_reload(self): configfile = self.setup_config('integration_occ.yaml') @@ -76,8 +76,8 @@ class TestShadeIntegration(tests.IntegrationTestCase): pool = self.useNodepool(configfile, watermark_sleep=1) pool.updateConfig() - provider_manager = pool.config.provider_managers['real-provider'] - self.assertEqual(provider_manager._client.auth, auth_data) + pm = pool.config.provider_managers['real-provider'] + self.assertEqual(pm.adapter._client.auth, auth_data) # update the config auth_data['password'] = 'os_new_real' @@ -86,5 +86,5 @@ class TestShadeIntegration(tests.IntegrationTestCase): yaml.safe_dump(occ_config, h) pool.updateConfig() - provider_manager = pool.config.provider_managers['real-provider'] - self.assertEqual(provider_manager._client.auth, auth_data) + pm = pool.config.provider_managers['real-provider'] + self.assertEqual(pm.adapter._client.auth, auth_data)