diff --git a/nodepool/cmd/nodepoolcmd.py b/nodepool/cmd/nodepoolcmd.py index 22b62eaab..d3b9af820 100644 --- a/nodepool/cmd/nodepoolcmd.py +++ b/nodepool/cmd/nodepoolcmd.py @@ -256,7 +256,7 @@ class NodePoolCmd(NodepoolApp): # up in alien list since we can't do anything about them # anyway. provider_images = [ - image for image in manager.listImages() + image for image in manager.adapter._listImages() if 'nodepool_build_id' in image['properties']] except Exception as e: log.warning("Exception listing alien images for %s: %s" diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 3e85b9148..dfacc59a6 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -894,6 +894,7 @@ class ConfigPool(ConfigValue, metaclass=abc.ABCMeta): self.node_attributes = None self.priority = None self.ignore_provider_quota = False + self.azs = None @classmethod def getCommonSchemaDict(self): diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 54a0621c9..318961109 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -289,7 +289,7 @@ class AwsAdapter(statemachine.Adapter): self._running = False def getCreateStateMachine(self, hostname, label, image_external_id, - metadata, retries, request, log): + metadata, retries, request, az, log): return AwsCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index 2c27a4842..2d281ead7 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -176,6 +176,7 @@ class AwsLabel(ConfigValue): self.iam_instance_profile = label.get('iam-instance-profile', None) self.tags = label.get('tags', {}) self.dynamic_tags = label.get('dynamic-tags', {}) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/azure/adapter.py b/nodepool/driver/azure/adapter.py index 474fa52dc..757d8b76c 100644 --- a/nodepool/driver/azure/adapter.py +++ b/nodepool/driver/azure/adapter.py @@ -347,7 +347,7 @@ class AzureAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return AzureCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/azure/config.py b/nodepool/driver/azure/config.py index bd439443f..87487d8eb 100644 --- a/nodepool/driver/azure/config.py +++ b/nodepool/driver/azure/config.py @@ -172,6 +172,7 @@ class AzureLabel(ConfigValue): self.dynamic_tags = label.get('dynamic-tags', {}) self.user_data = self._encodeData(label.get('user-data', None)) self.custom_data = self._encodeData(label.get('custom-data', None)) + self.host_key_checking = self.pool.host_key_checking def _encodeData(self, s): if not s: diff --git a/nodepool/driver/example/config.py b/nodepool/driver/example/config.py index bb5bd9995..8690431cc 100644 --- a/nodepool/driver/example/config.py +++ b/nodepool/driver/example/config.py @@ -54,6 +54,7 @@ class ProviderLabel(ConfigValue): self.instance_type = None # The ProviderPool object that owns this label. self.pool = None + self.host_key_checking = None def __eq__(self, other): if isinstance(other, ProviderLabel): diff --git a/nodepool/driver/fake/__init__.py b/nodepool/driver/fake/__init__.py index 03b5ffd65..f77f1110b 100644 --- a/nodepool/driver/fake/__init__.py +++ b/nodepool/driver/fake/__init__.py @@ -1,34 +1,34 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2022 Acme Gating, LLC # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from openstack.config import loader -from nodepool.driver import Driver +from nodepool.driver.statemachine import StateMachineDriver from nodepool.driver.fake.config import FakeProviderConfig -from nodepool.driver.fake.provider import FakeProvider +from nodepool.driver.fake.adapter import FakeAdapter -class FakeDriver(Driver): - def __init__(self): - super().__init__() - self.reset() - +class FakeDriver(StateMachineDriver): def reset(self): self.openstack_config = loader.OpenStackConfig() + def __init__(self): + self.reset() + super().__init__() + def getProviderConfig(self, provider): return FakeProviderConfig(self, provider) - def getProvider(self, provider_config): - return FakeProvider(provider_config) + def getAdapter(self, provider_config): + return FakeAdapter(provider_config) diff --git a/nodepool/driver/fake/provider.py b/nodepool/driver/fake/adapter.py similarity index 81% rename from nodepool/driver/fake/provider.py rename to nodepool/driver/fake/adapter.py index 48dcb98f3..7702debeb 100644 --- a/nodepool/driver/fake/provider.py +++ b/nodepool/driver/fake/adapter.py @@ -1,18 +1,17 @@ # Copyright (C) 2011-2013 OpenStack Foundation +# Copyright 2022 Acme Gating, LLC # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. import logging import threading @@ -20,12 +19,11 @@ import time import uuid import openstack.exceptions - -from nodepool import exceptions -from nodepool.driver.openstack.provider import OpenStackProvider -from nodepool.driver.fake.handler import FakeNodeRequestHandler from openstack.cloud.exc import OpenStackCloudCreateException +from nodepool.driver.openstack.adapter import OpenStackAdapter +from nodepool import exceptions + class Dummy(object): IMAGE = 'Image' @@ -56,6 +54,9 @@ class Dummy(object): args = ' '.join(args) return '<%s %s %s>' % (self.__kind, id(self), args) + def __contains__(self, key): + return key in self.__dict__ + def __getitem__(self, key, default=None): return getattr(self, key, default) @@ -90,10 +91,13 @@ class FakeOpenStackCloud(object): ] if networks is None: self.ipv6_network_uuid = uuid.uuid4().hex + self.no_auto_ip_network_uuid = uuid.uuid4().hex networks = [dict(id=uuid.uuid4().hex, name='fake-public-network-name'), dict(id=uuid.uuid4().hex, name='fake-private-network-name'), + dict(id=self.no_auto_ip_network_uuid, + name='no-auto-ip-network-name'), dict(id=self.ipv6_network_uuid, name='fake-ipv6-network-name')] self.networks = networks @@ -112,6 +116,7 @@ class FakeOpenStackCloud(object): Dummy(Dummy.PORT, id=uuid.uuid4().hex, status='DOWN', device_owner=None), ] + self._floating_ip_list = [] def _update_quota(self): self.max_cores, self.max_instances, self.max_ram = FakeOpenStackCloud.\ @@ -141,7 +146,10 @@ class FakeOpenStackCloud(object): addresses = None # if keyword 'ipv6-uuid' is found in provider config, # ipv6 address will be available in public addr dict. + auto_ip = True for nic in nics: + if nic['net-id'] == self.no_auto_ip_network_uuid: + auto_ip = False if nic['net-id'] != self.ipv6_network_uuid: continue addresses = dict( @@ -156,15 +164,20 @@ class FakeOpenStackCloud(object): interface_ip = 'fake_v6' break if not addresses: - addresses = dict( - public=[dict(version=4, addr='fake')], - private=[dict(version=4, addr='fake')] - ) - public_v6 = '' - public_v4 = 'fake' - private_v4 = 'fake' host_id = 'fake' - interface_ip = 'fake' + private_v4 = 'fake' + if auto_ip: + addresses = dict( + public=[dict(version=4, addr='fake')], + private=[dict(version=4, addr='fake')] + ) + public_v6 = '' + public_v4 = 'fake' + interface_ip = 'fake' + else: + public_v4 = '' + public_v6 = '' + interface_ip = private_v4 self._update_quota() over_quota = False if (instance_type == Dummy.INSTANCE and @@ -190,12 +203,13 @@ class FakeOpenStackCloud(object): host_id=host_id, interface_ip=interface_ip, security_groups=security_groups, - location=Dummy(Dummy.LOCATION, zone=kw.get('az')), + location=Dummy(Dummy.LOCATION, zone=az), metadata=kw.get('meta', {}), manager=self, key_name=kw.get('key_name', None), should_fail=should_fail, over_quota=over_quota, + flavor=kw.get('flavor'), event=threading.Event(), _kw=kw) instance_list.append(s) @@ -273,19 +287,22 @@ class FakeOpenStackCloud(object): def get_server_by_id(self, server_id): return self.get_server(server_id) - def _clean_floating_ip(self, server): - server.public_v4 = '' - server.public_v6 = '' - server.interface_ip = server.private_v4 - return server + def list_floating_ips(self): + return self._floating_ip_list - def wait_for_server(self, server, **kwargs): - while server.status == 'BUILD': - time.sleep(0.1) - auto_ip = kwargs.get('auto_ip') - if not auto_ip: - server = self._clean_floating_ip(server) - return server + def create_floating_ip(self, server): + fip = Dummy('floating_ips', + id=uuid.uuid4().hex, + floating_ip_address='fake', + status='ACTIVE') + self._floating_ip_list.append(fip) + return fip + + def _needs_floating_ip(self, server, nat_destination): + return False + + def _has_floating_ips(self): + return False def list_servers(self, bare=False): return self._server_list @@ -326,8 +343,8 @@ class FakeOpenStackCloud(object): class FakeUploadFailCloud(FakeOpenStackCloud): log = logging.getLogger("nodepool.FakeUploadFailCloud") - def __init__(self, times_to_fail=None): - super(FakeUploadFailCloud, self).__init__() + def __init__(self, *args, times_to_fail=None, **kw): + super(FakeUploadFailCloud, self).__init__(*args, **kw) self.times_to_fail = times_to_fail self.times_failed = 0 @@ -344,14 +361,16 @@ class FakeUploadFailCloud(FakeOpenStackCloud): class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud): log = logging.getLogger("nodepool.FakeLaunchAndGetFaultCloud") - def __init__(self): - super().__init__() - - def wait_for_server(self, server, **kwargs): + def create_server(self, *args, **kwargs): # OpenStack provider launch code specifically looks for 'quota' in # the failure message. + server = super().create_server( + *args, **kwargs, + done_status='ERROR') + # Don't wait for the async update + server.status = 'ERROR' server.fault = {'message': 'quota server fault'} - raise Exception("wait_for_server failure") + raise OpenStackCloudCreateException('server', server.id) class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): @@ -366,16 +385,18 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): self.launch_success = False self.delete_success = False - def wait_for_server(self, **kwargs): + def create_server(self, *args, **kwargs): if self.times_to_fail_launch is None: raise Exception("Test fail server launch.") if self.times_failed_launch < self.times_to_fail_launch: self.times_failed_launch += 1 - raise exceptions.ServerDeleteException("Test fail server launch.") + # Simulate a failure after the server record is created + ret = super().create_server(*args, **kwargs, done_status='ERROR') + ret.fault = {'message': 'expected error'} + return ret else: self.launch_success = True - return super(FakeLaunchAndDeleteFailCloud, - self).wait_for_server(**kwargs) + return super().create_server(*args, **kwargs) def delete_server(self, *args, **kwargs): if self.times_to_fail_delete is None: @@ -385,8 +406,7 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud): raise exceptions.ServerDeleteException("Test fail server delete.") else: self.delete_success = True - return super(FakeLaunchAndDeleteFailCloud, - self).delete_server(*args, **kwargs) + return super().delete_server(*args, **kwargs) class FakeDeleteImageFailCloud(FakeOpenStackCloud): @@ -404,31 +424,23 @@ class FakeDeleteImageFailCloud(FakeOpenStackCloud): self).delete_image(*args, **kwargs) -class FakeProvider(OpenStackProvider): +class FakeAdapter(OpenStackAdapter): fake_cloud = FakeOpenStackCloud - def __init__(self, provider): + def __init__(self, provider_config): self.createServer_fails = 0 self.createServer_fails_with_external_id = 0 - self.__client = FakeProvider.fake_cloud() - super(FakeProvider, self).__init__(provider) + self.__client = FakeAdapter.fake_cloud() + super().__init__(provider_config) def _getClient(self): return self.__client - def createServer(self, *args, **kwargs): + def _createServer(self, *args, **kwargs): while self.createServer_fails: self.createServer_fails -= 1 raise Exception("Expected createServer exception") while self.createServer_fails_with_external_id: self.createServer_fails_with_external_id -= 1 raise OpenStackCloudCreateException('server', 'fakeid') - return super(FakeProvider, self).createServer(*args, **kwargs) - - def getRequestHandler(self, poolworker, request): - return FakeNodeRequestHandler(poolworker, request) - - def start(self, zk_conn): - if self.provider.region_name == 'broken-region': - raise Exception("Broken cloud config") - super().start(zk_conn) + return super()._createServer(*args, **kwargs) diff --git a/nodepool/driver/fake/handler.py b/nodepool/driver/fake/handler.py deleted file mode 100644 index dbd949c55..000000000 --- a/nodepool/driver/fake/handler.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2017 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler - - -class FakeNodeRequestHandler(OpenStackNodeRequestHandler): - launcher_id = "Fake" diff --git a/nodepool/driver/gce/adapter.py b/nodepool/driver/gce/adapter.py index be1ffca73..268d755f0 100644 --- a/nodepool/driver/gce/adapter.py +++ b/nodepool/driver/gce/adapter.py @@ -162,7 +162,7 @@ class GceAdapter(statemachine.Adapter): self.provider.rate) def getCreateStateMachine(self, hostname, label, image_external_id, - metadata, retries, request, log): + metadata, retries, request, az, log): return GceCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, request, log) diff --git a/nodepool/driver/gce/config.py b/nodepool/driver/gce/config.py index 0f025a557..c5817b55e 100644 --- a/nodepool/driver/gce/config.py +++ b/nodepool/driver/gce/config.py @@ -61,6 +61,7 @@ class GceLabel(ConfigValue): self.volume_type = label.get('volume-type', 'pd-standard') self.volume_size = label.get('volume-size', '10') self.diskimage = None + self.host_key_checking = self.pool.host_key_checking class GcePool(ConfigPool): diff --git a/nodepool/driver/ibmvpc/adapter.py b/nodepool/driver/ibmvpc/adapter.py index 404a3d18d..f526ba08d 100644 --- a/nodepool/driver/ibmvpc/adapter.py +++ b/nodepool/driver/ibmvpc/adapter.py @@ -381,7 +381,7 @@ class IBMVPCAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return IBMVPCCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) diff --git a/nodepool/driver/ibmvpc/config.py b/nodepool/driver/ibmvpc/config.py index c624cccfb..57dbfbb87 100644 --- a/nodepool/driver/ibmvpc/config.py +++ b/nodepool/driver/ibmvpc/config.py @@ -162,6 +162,7 @@ class IBMVPCLabel(ConfigValue): self.profile = label['profile'] self.user_data = label.get('user-data', None) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/metastatic/adapter.py b/nodepool/driver/metastatic/adapter.py index 558ec7a61..b99a29b38 100644 --- a/nodepool/driver/metastatic/adapter.py +++ b/nodepool/driver/metastatic/adapter.py @@ -277,7 +277,7 @@ class MetastaticAdapter(statemachine.Adapter): def getCreateStateMachine(self, hostname, label, image_external_id, metadata, retries, - request, log): + request, az, log): return MetastaticCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) diff --git a/nodepool/driver/metastatic/config.py b/nodepool/driver/metastatic/config.py index 443db4d84..f4863a71f 100644 --- a/nodepool/driver/metastatic/config.py +++ b/nodepool/driver/metastatic/config.py @@ -45,6 +45,7 @@ class MetastaticLabel(ConfigValue): self.cloud_image = MetastaticCloudImage() self.max_parallel_jobs = label.get('max-parallel-jobs', 1) self.grace_time = label.get('grace-time', 60) + self.host_key_checking = self.pool.host_key_checking @staticmethod def getSchema(): diff --git a/nodepool/driver/openstack/__init__.py b/nodepool/driver/openstack/__init__.py index add696f52..178d7a09e 100644 --- a/nodepool/driver/openstack/__init__.py +++ b/nodepool/driver/openstack/__init__.py @@ -1,34 +1,34 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2022 Acme Gating, LLC # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from openstack.config import loader -from nodepool.driver import Driver +from nodepool.driver.statemachine import StateMachineDriver from nodepool.driver.openstack.config import OpenStackProviderConfig -from nodepool.driver.openstack.provider import OpenStackProvider +from nodepool.driver.openstack.adapter import OpenStackAdapter -class OpenStackDriver(Driver): - def __init__(self): - super().__init__() - self.reset() - +class OpenStackDriver(StateMachineDriver): def reset(self): self.openstack_config = loader.OpenStackConfig() + def __init__(self): + self.reset() + super().__init__() + def getProviderConfig(self, provider): return OpenStackProviderConfig(self, provider) - def getProvider(self, provider_config): - return OpenStackProvider(provider_config) + def getAdapter(self, provider_config): + return OpenStackAdapter(provider_config) diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py new file mode 100644 index 000000000..2629ef3fe --- /dev/null +++ b/nodepool/driver/openstack/adapter.py @@ -0,0 +1,801 @@ +# Copyright (C) 2011-2013 OpenStack Foundation +# Copyright 2017 Red Hat +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import ThreadPoolExecutor +import functools +import logging +import time +import operator + +import cachetools.func +import openstack + +from nodepool.driver.utils import QuotaInformation +from nodepool.driver import statemachine +from nodepool import exceptions +from nodepool import stats +from nodepool import version + +CACHE_TTL = 10 + + +class OpenStackInstance(statemachine.Instance): + def __init__(self, provider, server, quota): + super().__init__() + self.external_id = server.id + self.metadata = server.metadata + self.private_ipv4 = server.private_v4 + self.private_ipv6 = None + self.public_ipv4 = server.public_v4 + self.public_ipv6 = server.public_v6 + self.host_id = server.host_id + self.cloud = provider.cloud_config.name + self.region = provider.region_name + self.az = server.location.zone + + self.interface_ip = server.interface_ip + self.quota = quota + + def getQuotaInformation(self): + return self.quota + + +class OpenStackResource(statemachine.Resource): + def __init__(self, metadata, type, id): + super().__init__(metadata) + self.type = type + self.id = id + + +class OpenStackDeleteStateMachine(statemachine.StateMachine): + FLOATING_IP_DELETING = 'deleting floating ip' + SERVER_DELETE = 'delete server' + SERVER_DELETING = 'deleting server' + COMPLETE = 'complete' + + def __init__(self, adapter, external_id, log): + self.log = log + super().__init__() + self.adapter = adapter + self.external_id = external_id + self.floating_ips = None + + def advance(self): + if self.state == self.START: + self.server = self.adapter._getServer(self.external_id) + if (self.server and + self.adapter._hasFloatingIps() and + self.server.addresses): + self.floating_ips = self.adapter._getFloatingIps(self.server) + for fip in self.floating_ips: + self.adapter._deleteFloatingIp(fip) + self.state = self.FLOATING_IP_DELETING + if not self.floating_ips: + self.state = self.SERVER_DELETE + + if self.state == self.FLOATING_IP_DELETING: + fips = [] + for fip in self.floating_ips: + fip = self.adapter._refreshFloatingIpDelete(fip) + if not fip or fip['status'] == 'DOWN': + fip = None + if fip: + fips.append(fip) + self.floating_ips = fips + if self.floating_ips: + return + else: + self.state = self.SERVER_DELETE + + if self.state == self.SERVER_DELETE: + self.adapter._deleteServer(self.external_id) + self.state = self.SERVER_DELETING + + if self.state == self.SERVER_DELETING: + self.server = self.adapter._refreshServerDelete(self.server) + if self.server: + return + else: + self.state = self.COMPLETE + + if self.state == self.COMPLETE: + self.complete = True + + +class OpenStackCreateStateMachine(statemachine.StateMachine): + SERVER_CREATING_SUBMIT = 'submit creating server' + SERVER_CREATING = 'creating server' + SERVER_RETRY = 'retrying server creation' + SERVER_RETRY_DELETING = 'deleting server for retry' + FLOATING_IP_CREATING = 'creating floating ip' + FLOATING_IP_ATTACHING = 'attaching floating ip' + COMPLETE = 'complete' + + def __init__(self, adapter, hostname, label, image_external_id, + metadata, retries, request, az, log): + self.log = log + super().__init__() + self.adapter = adapter + self.provider = adapter.provider + self.retries = retries + self.attempts = 0 + self.label = label + self.server = None + self.hostname = hostname + self.request = request + self.az = az + + if image_external_id: + self.image_external = image_external_id + diskimage = self.provider.diskimages[label.diskimage.name] + self.config_drive = diskimage.config_drive + image_name = diskimage.name + else: + # launch using unmanaged cloud image + self.config_drive = label.cloud_image.config_drive + + if label.cloud_image.image_id: + # Using a dict with the ID bypasses an image search during + # server creation. + self.image_external = dict(id=label.cloud_image.image_id) + else: + self.image_external = label.cloud_image.external_name + image_name = label.cloud_image.name + + props = label.instance_properties.copy() + for k, v in label.dynamic_instance_properties.items(): + try: + props[k] = v.format(request=self.request.getSafeAttributes()) + except Exception: + self.log.exception( + "Error formatting dynamic instance property %s", k) + if not props: + props = None + + # Put provider.name and image_name in as groups so that ansible + # inventory can auto-create groups for us based on each of those + # qualities + # Also list each of those values directly so that non-ansible + # consumption programs don't need to play a game of knowing that + # groups[0] is the image name or anything silly like that. + groups_list = [self.provider.name] + groups_list.append(image_name) + groups_list.append(label.name) + meta = dict( + groups=",".join(groups_list), + ) + # merge in any instance properties provided from config + if props: + meta.update(props) + # merge nodepool-internal metadata + meta.update(metadata) + self.metadata = meta + self.flavor = self.adapter._findFlavor( + flavor_name=self.label.flavor_name, + min_ram=self.label.min_ram) + self.quota = QuotaInformation.construct_from_flavor(self.flavor) + self.external_id = None + + def _handleServerFault(self): + if not self.external_id: + return + try: + server = self.adapter._getServerByIdNow(self.external_id) + if not server: + return + fault = server.get('fault', {}).get('message') + if fault: + self.log.error('Detailed node error: %s', fault) + if 'quota' in fault: + self.quota_exceeded = True + except Exception: + self.log.exception( + 'Failed to retrieve node error information:') + + def advance(self): + if self.state == self.START: + self.external_id = None + self.quota_exceeded = False + self.create_future = self.adapter._submitApi( + self.adapter._createServer, + self.hostname, + image=self.image_external, + flavor=self.flavor, + key_name=self.label.key_name, + az=self.az, + config_drive=self.config_drive, + networks=self.label.networks, + security_groups=self.label.pool.security_groups, + boot_from_volume=self.label.boot_from_volume, + volume_size=self.label.volume_size, + instance_properties=self.metadata, + userdata=self.label.userdata) + self.state = self.SERVER_CREATING_SUBMIT + + if self.state == self.SERVER_CREATING_SUBMIT: + try: + try: + self.server = self.adapter._completeApi(self.create_future) + if self.server is None: + return + self.external_id = self.server.id + self.state = self.SERVER_CREATING + except openstack.cloud.exc.OpenStackCloudCreateException as e: + if e.resource_id: + self.external_id = e.resource_id + self._handleServerFault() + raise + except Exception as e: + self.log.exception("Launch attempt %d/%d failed:", + self.attempts, self.retries) + if 'quota exceeded' in str(e).lower(): + self.quota_exceeded = True + if 'number of ports exceeded' in str(e).lower(): + self.quota_exceeded = True + self.state = self.SERVER_RETRY + + if self.state == self.SERVER_CREATING: + self.server = self.adapter._refreshServer(self.server) + + if self.server.status == 'ACTIVE': + if (self.label.pool.auto_floating_ip and + self.adapter._needsFloatingIp(self.server)): + self.floating_ip = self.adapter._createFloatingIp( + self.server) + self.state = self.FLOATING_IP_CREATING + else: + self.state = self.COMPLETE + elif self.server.status == 'ERROR': + if ('fault' in self.server and self.server['fault'] is not None + and 'message' in self.server['fault']): + self.log.error( + "Error in creating the server." + " Compute service reports fault: {reason}".format( + reason=self.server['fault']['message'])) + if self.external_id: + try: + self.server = self.adapter._deleteServer( + self.external_id) + except Exception: + self.log.exception("Error deleting server:") + self.server = None + else: + self.server = None + self.state = self.SERVER_RETRY + else: + return + + if self.state == self.SERVER_RETRY: + if self.external_id: + try: + self.server = self.adapter._deleteServer(self.external_id) + except Exception: + self.log.exception("Error deleting server:") + # We must keep trying the delete until timeout in + # order to avoid having two servers for the same + # node id. + return + else: + self.server = None + self.state = self.SERVER_RETRY_DELETING + + if self.state == self.SERVER_RETRY_DELETING: + self.server = self.adapter._refreshServerDelete(self.server) + if self.server: + return + self.attempts += 1 + if self.attempts >= self.retries: + raise Exception("Too many retries") + if self.quota_exceeded: + # A quota exception is not directly recoverable so bail + # out immediately with a specific exception. + self.log.info("Quota exceeded, invalidating quota cache") + raise exceptions.QuotaException("Quota exceeded") + self.state = self.START + return + + if self.state == self.FLOATING_IP_CREATING: + self.floating_ip = self.adapter._refreshFloatingIp( + self.floating_ip) + if self.floating_ip.get('port_id', None): + if self.floating_ip['status'] == 'ACTIVE': + self.state = self.FLOATING_IP_ATTACHING + else: + return + else: + self.adapter._attachIpToServer(self.server, self.floating_ip) + self.state = self.FLOATING_IP_ATTACHING + + if self.state == self.FLOATING_IP_ATTACHING: + self.server = self.adapter._refreshServer(self.server) + ext_ip = openstack.cloud.meta.get_server_ip( + self.server, ext_tag='floating', public=True) + if ext_ip == self.floating_ip['floating_ip_address']: + self.state = self.COMPLETE + else: + return + + if self.state == self.COMPLETE: + self.complete = True + return OpenStackInstance( + self.adapter.provider, self.server, self.quota) + + +class OpenStackAdapter(statemachine.Adapter): + # If we fail to find an image specified by the config, invalidate + # the image cache after this interval: + IMAGE_CHECK_TIMEOUT = 300 + + def __init__(self, provider_config): + # Wrap these instance methods with a per-instance LRU cache so + # that we don't leak memory over time when the adapter is + # occasionally replaced. + self._findImage = functools.lru_cache(maxsize=None)( + self._findImage) + self._listFlavors = functools.lru_cache(maxsize=None)( + self._listFlavors) + self._findNetwork = functools.lru_cache(maxsize=None)( + self._findNetwork) + self._listAZs = functools.lru_cache(maxsize=None)( + self._listAZs) + + self.log = logging.getLogger( + f"nodepool.OpenStackAdapter.{provider_config.name}") + self.provider = provider_config + + workers = 8 + self.log.info("Create executor with max workers=%s", workers) + self.api_executor = ThreadPoolExecutor( + thread_name_prefix=f'openstack-api-{provider_config.name}', + max_workers=workers) + + self._last_image_check_failure = time.time() + self._last_port_cleanup = None + self._statsd = stats.get_client() + self._client = self._getClient() + + def stop(self): + self.api_executor.shutdown() + + def getCreateStateMachine(self, hostname, label, image_external_id, + metadata, retries, request, az, log): + return OpenStackCreateStateMachine( + self, hostname, label, image_external_id, + metadata, retries, request, az, log) + + def getDeleteStateMachine(self, external_id, log): + return OpenStackDeleteStateMachine(self, external_id, log) + + def listResources(self): + for server in self._listServers(): + if server.status.lower() == 'deleted': + continue + yield OpenStackResource(server.metadata, + 'server', server.id) + # Floating IP and port leakage can't be handled by the + # automatic resource cleanup in cleanupLeakedResources because + # openstack doesn't store metadata on those objects, so we + # call internal cleanup methods here. + if self.provider.port_cleanup_interval: + self._cleanupLeakedPorts() + if self.provider.clean_floating_ips: + self._cleanupFloatingIps() + + def deleteResource(self, resource): + self.log.info(f"Deleting leaked {resource.type}: {resource.id}") + if resource.type == 'server': + self._deleteServer(resource.id) + + def listInstances(self): + for server in self._listServers(): + if server.status.lower() == 'deleted': + continue + flavor = self._getFlavorFromServer(server) + quota = QuotaInformation.construct_from_flavor(flavor) + yield OpenStackInstance(self.provider, server, quota) + + def getQuotaLimits(self): + limits = self._client.get_compute_limits() + return QuotaInformation.construct_from_limits(limits) + + def getQuotaForLabel(self, label): + flavor = self._findFlavor(label.flavor_name, label.min_ram) + return QuotaInformation.construct_from_flavor(flavor) + + def getAZs(self): + azs = self._listAZs() + if not azs: + # If there are no zones, return a list containing None so that + # random.choice can pick None and pass that to Nova. If this + # feels dirty, please direct your ire to policy.json and the + # ability to turn off random portions of the OpenStack API. + return [None] + return azs + + def labelReady(self, label): + if not label.cloud_image: + return False + + # If an image ID was supplied, we'll assume it is ready since + # we don't currently have a way of validating that (except during + # server creation). + if label.cloud_image.image_id: + return True + + image = self._findImage(label.cloud_image.external_name) + if not image: + self.log.warning( + "Provider %s is configured to use %s as the" + " cloud-image for label %s and that" + " cloud-image could not be found in the" + " cloud." % (self.provider.name, + label.cloud_image.external_name, + label.name)) + # If the user insists there should be an image but it + # isn't in our cache, invalidate the cache periodically so + # that we can see new cloud image uploads. + if (time.time() - self._last_image_check_failure > + self.IMAGE_CHECK_TIMEOUT): + self._findImage.cache_clear() + self._last_image_check_failure = time.time() + return False + return True + + def uploadImage(self, provider_image, image_name, filename, + image_format, metadata, md5, sha256): + # configure glance and upload image. Note the meta flags + # are provided as custom glance properties + # NOTE: we have wait=True set here. This is not how we normally + # do things in nodepool, preferring to poll ourselves thankyouverymuch. + # However - two things to note: + # - PUT has no aysnc mechanism, so we have to handle it anyway + # - v2 w/task waiting is very strange and complex - but we have to + # block for our v1 clouds anyway, so we might as well + # have the interface be the same and treat faking-out + # a openstacksdk-level fake-async interface later + if not metadata: + metadata = {} + if image_format: + metadata['disk_format'] = image_format + image = self._client.create_image( + name=image_name, + filename=filename, + is_public=False, + wait=True, + md5=md5, + sha256=sha256, + **metadata) + return image.id + + def deleteImage(self, external_id): + self.log.debug(f"Deleting image {external_id}") + return self._client.delete_image(external_id) + + # Local implementation + + def _getClient(self): + rate_limit = None + # nodepool tracks rate limit in time between requests. + # openstacksdk tracks rate limit in requests per second. + # 1/time = requests-per-second. + if self.provider.rate: + rate_limit = 1 / self.provider.rate + return openstack.connection.Connection( + config=self.provider.cloud_config, + use_direct_get=False, + rate_limit=rate_limit, + app_name='nodepool', + app_version=version.version_info.version_string() + ) + + def _submitApi(self, api, *args, **kw): + return self.api_executor.submit( + api, *args, **kw) + + def _completeApi(self, future): + if not future.done(): + return None + return future.result() + + def _createServer(self, name, image, flavor, + az=None, key_name=None, config_drive=True, + networks=None, security_groups=None, + boot_from_volume=False, volume_size=50, + instance_properties=None, userdata=None): + if not networks: + networks = [] + if not isinstance(image, dict): + # if it's a dict, we already have the cloud id. If it's not, + # we don't know if it's name or ID so need to look it up + image = self._findImage(image) + create_args = dict(name=name, + image=image, + flavor=flavor, + config_drive=config_drive) + if boot_from_volume: + create_args['boot_from_volume'] = boot_from_volume + create_args['volume_size'] = volume_size + # NOTE(pabelanger): Always cleanup volumes when we delete a server. + create_args['terminate_volume'] = True + if key_name: + create_args['key_name'] = key_name + if az: + create_args['availability_zone'] = az + if security_groups: + create_args['security_groups'] = security_groups + if userdata: + create_args['userdata'] = userdata + nics = [] + for network in networks: + net_id = self._findNetwork(network)['id'] + nics.append({'net-id': net_id}) + if nics: + create_args['nics'] = nics + if instance_properties: + create_args['meta'] = instance_properties + + try: + return self._client.create_server(wait=False, **create_args) + except openstack.exceptions.BadRequestException: + # We've gotten a 400 error from nova - which means the request + # was malformed. The most likely cause of that, unless something + # became functionally and systemically broken, is stale az, image + # or flavor cache. Log a message, invalidate the caches so that + # next time we get new caches. + self.log.info( + "Clearing az, flavor and image caches due to 400 error " + "from nova") + self._findImage.cache_clear() + self._listFlavors.cache_clear() + self._findNetwork.cache_clear() + self._listAZs.cache_clear() + raise + + # This method is wrapped with an LRU cache in the constructor. + def _listAZs(self): + return self._client.list_availability_zone_names() + + # This method is wrapped with an LRU cache in the constructor. + def _findImage(self, name): + return self._client.get_image(name, filters={'status': 'active'}) + + # This method is wrapped with an LRU cache in the constructor. + def _listFlavors(self): + return self._client.list_flavors(get_extra=False) + + # This method is only used by the nodepool alien-image-list + # command and only works with the openstack driver. + def _listImages(self): + return self._client.list_images() + + def _getFlavors(self): + flavors = self._listFlavors() + flavors.sort(key=operator.itemgetter('ram')) + return flavors + + def _findFlavorByName(self, flavor_name): + for f in self._getFlavors(): + if flavor_name in (f['name'], f['id']): + return f + raise Exception("Unable to find flavor: %s" % flavor_name) + + def _findFlavorByRam(self, min_ram, flavor_name): + for f in self._getFlavors(): + if (f['ram'] >= min_ram + and (not flavor_name or flavor_name in f['name'])): + return f + raise Exception("Unable to find flavor with min ram: %s" % min_ram) + + def _findFlavorById(self, flavor_id): + for f in self._getFlavors(): + if f['id'] == flavor_id: + return f + raise Exception("Unable to find flavor with id: %s" % flavor_id) + + def _findFlavor(self, flavor_name, min_ram): + if min_ram: + return self._findFlavorByRam(min_ram, flavor_name) + else: + return self._findFlavorByName(flavor_name) + + # This method is wrapped with an LRU cache in the constructor. + def _findNetwork(self, name): + network = self._client.get_network(name) + if not network: + raise Exception("Unable to find network %s in provider %s" % ( + name, self.provider.name)) + return network + + @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) + def _listServers(self): + return self._client.list_servers() + + @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) + def _listFloatingIps(self): + return self._client.list_floating_ips() + + def _refreshServer(self, obj): + for server in self._listServers(): + if server.id == obj.id: + return server + return obj + + def _getServer(self, external_id): + for server in self._listServers(): + if server.id == external_id: + return server + return None + + def _getServerByIdNow(self, server_id): + # A synchronous get server by id. Only to be used in error + # handling where we can't wait for the list to update. + return self._client.get_server_by_id(server_id) + + def _refreshServerDelete(self, obj): + if obj is None: + return obj + for server in self._listServers(): + if server.id == obj.id: + if server.status.lower() == 'deleted': + return None + return server + return None + + def _refreshFloatingIp(self, obj): + for fip in self._listFloatingIps(): + if fip.id == obj.id: + return fip + return obj + + def _refreshFloatingIpDelete(self, obj): + if obj is None: + return obj + for fip in self._listFloatingIps(): + if fip.id == obj.id: + if fip.status == 'DOWN': + return None + return fip + return obj + + def _needsFloatingIp(self, server): + return self._client._needs_floating_ip( + server=server, nat_destination=None) + + def _createFloatingIp(self, server): + return self._client.create_floating_ip(server=server, wait=True) + + def _attachIpToServer(self, server, fip): + # skip_attach is ignored for nova, which is the only time we + # should actually call this method. + return self._client._attach_ip_to_server( + server=server, floating_ip=fip, + skip_attach=True) + + def _hasFloatingIps(self): + return self._client._has_floating_ips() + + def _getFloatingIps(self, server): + fips = openstack.cloud.meta.find_nova_interfaces( + server['addresses'], ext_tag='floating') + fips = [self._client.get_floating_ip( + id=None, filters={'floating_ip_address': fip['addr']}) + for fip in fips] + return fips + + def _deleteFloatingIp(self, fip): + self._client.delete_floating_ip(fip['id'], retry=0) + + def _deleteServer(self, external_id): + self._client.delete_server(external_id) + + def _getFlavorFromServer(self, server): + # In earlier versions of nova or the sdk, flavor has just an id. + # In later versions it returns the information we're looking for. + # If we get the information we want, we do not need to try to + # lookup the flavor in our list. + if hasattr(server.flavor, 'vcpus'): + return server.flavor + else: + return self._findFlavorById(server.flavor.id) + + # The port cleanup logic. We don't get tags or metadata, so we + # have to figure this out on our own. + + # This method is not cached + def _listPorts(self, status=None): + ''' + List known ports. + + :param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'. + ''' + if status: + ports = self._client.list_ports(filters={'status': status}) + else: + ports = self._client.list_ports() + return ports + + def _filterComputePorts(self, ports): + ''' + Return a list of compute ports (or no device owner). + + We are not interested in ports for routers or DHCP. + ''' + ret = [] + for p in ports: + if (p.device_owner is None or p.device_owner == '' or + p.device_owner.startswith("compute:")): + ret.append(p) + return ret + + def _cleanupLeakedPorts(self): + if not self._last_port_cleanup: + self._last_port_cleanup = time.monotonic() + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + self._down_ports = set([p.id for p in ports]) + return + + # Return if not enough time has passed between cleanup + last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) + if last_check_in_secs <= self.provider.port_cleanup_interval: + return + + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + current_set = set([p.id for p in ports]) + remove_set = current_set & self._down_ports + + removed_count = 0 + for port_id in remove_set: + try: + self._deletePort(port_id) + except Exception: + self.log.exception("Exception deleting port %s in %s:", + port_id, self.provider.name) + else: + removed_count += 1 + self.log.debug("Removed DOWN port %s in %s", + port_id, self.provider.name) + + if self._statsd and removed_count: + key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) + self._statsd.incr(key, removed_count) + + self._last_port_cleanup = time.monotonic() + + # Rely on OpenStack to tell us the down ports rather than doing our + # own set adjustment. + ports = self._listPorts(status='DOWN') + ports = self._filterComputePorts(ports) + self._down_ports = set([p.id for p in ports]) + + def _deletePort(self, port_id): + self._client.delete_port(port_id) + + def _cleanupFloatingIps(self): + did_clean = self._client.delete_unattached_floating_ips() + if did_clean: + # some openstacksdk's return True if any port was + # cleaned, rather than the count. Just set it to 1 to + # indicate something happened. + if type(did_clean) == bool: + did_clean = 1 + if self._statsd: + key = ('nodepool.provider.%s.leaked.floatingips' + % self.provider.name) + self._statsd.incr(key, did_clean) diff --git a/nodepool/driver/openstack/config.py b/nodepool/driver/openstack/config.py index 6c39a6489..7d63794a7 100644 --- a/nodepool/driver/openstack/config.py +++ b/nodepool/driver/openstack/config.py @@ -29,11 +29,11 @@ class ProviderDiskImage(ConfigValue): self.config_drive = None self.connection_type = None self.connection_port = None + self.username = None + self.python_path = None + self.shell_type = None self.meta = None - def __repr__(self): - return "" % self.name - class ProviderCloudImage(ConfigValue): def __init__(self): @@ -47,9 +47,6 @@ class ProviderCloudImage(ConfigValue): self.connection_type = None self.connection_port = None - def __repr__(self): - return "" % self.name - @property def external_name(self): '''Human readable version of external.''' @@ -76,28 +73,6 @@ class ProviderLabel(ConfigValue): # The ProviderPool object that owns this label. self.pool = None - def __eq__(self, other): - if isinstance(other, ProviderLabel): - # NOTE(Shrews): We intentionally do not compare 'pool' here - # since this causes recursive checks with ProviderPool. - return (other.diskimage == self.diskimage and - other.cloud_image == self.cloud_image and - other.min_ram == self.min_ram and - other.flavor_name == self.flavor_name and - other.key_name == self.key_name and - other.name == self.name and - other.console_log == self.console_log and - other.boot_from_volume == self.boot_from_volume and - other.volume_size == self.volume_size and - other.instance_properties == self.instance_properties and - other.userdata == self.userdata and - other.networks == self.networks and - other.host_key_checking == self.host_key_checking) - return False - - def __repr__(self): - return "" % self.name - class ProviderPool(ConfigPool): ignore_equality = ['provider'] @@ -112,6 +87,7 @@ class ProviderPool(ConfigPool): self.security_groups = None self.auto_floating_ip = True self.host_key_checking = True + self.use_internal_ip = False self.labels = None # The OpenStackProviderConfig object that owns this pool. self.provider = None @@ -119,9 +95,6 @@ class ProviderPool(ConfigPool): # Initialize base class attributes super().__init__() - def __repr__(self): - return "" % self.name - def load(self, pool_config, full_config, provider): ''' Load pool configuration options. @@ -259,6 +232,9 @@ class OpenStackProviderConfig(ProviderConfig): diskimage.image_types.add(self.image_type) i.pause = bool(image.get('pause', False)) i.config_drive = image.get('config-drive', None) + i.username = diskimage.username + i.python_path = diskimage.python_path + i.shell_type = diskimage.shell_type i.connection_type = image.get('connection-type', 'ssh') i.connection_port = image.get( 'connection-port', diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py deleted file mode 100644 index fccb0cd01..000000000 --- a/nodepool/driver/openstack/handler.py +++ /dev/null @@ -1,477 +0,0 @@ -# Copyright (C) 2011-2014 OpenStack Foundation -# Copyright 2017 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import math -import pprint -import random - -from kazoo import exceptions as kze -import openstack - -from nodepool import exceptions -from nodepool import nodeutils as utils -from nodepool.zk import zookeeper as zk -from nodepool.driver.utils import NodeLauncher, QuotaInformation -from nodepool.driver import NodeRequestHandler - - -class OpenStackNodeLauncher(NodeLauncher): - def __init__(self, handler, node, provider_config, provider_label, - request): - ''' - Initialize the launcher. - - :param OpenStackNodeRequestHandler handler: The handler object. - :param Node node: A Node object describing the node to launch. - :param ProviderConfig provider_config: A ProviderConfig object - describing the provider launching this node. - :param ProviderLabel provider_label: A ProviderLabel object - describing the label to use for the node. - :param NodeRequest request: The NodeRequest that prompted the - launch. - ''' - super().__init__(handler, node, provider_config) - - # Number of times to retry failed launches. - self._retries = provider_config.launch_retries - - self.label = provider_label - self.pool = provider_label.pool - self.request = request - - def _logConsole(self, server_id, hostname): - if not self.label.console_log: - return - console = self.handler.manager.getServerConsole(server_id) - if console: - self.log.info('Console log from hostname %s:' % hostname) - for line in console.splitlines(): - self.log.info(line.rstrip()) - - def _launchNode(self): - if self.label.diskimage: - diskimage = self.provider_config.diskimages[ - self.label.diskimage.name] - else: - diskimage = None - - if diskimage: - # launch using diskimage - cloud_image = self.handler.zk.getMostRecentImageUpload( - diskimage.name, self.provider_config.name) - - if not cloud_image: - raise exceptions.LaunchNodepoolException( - "Unable to find current cloud image %s in %s" % - (diskimage.name, self.provider_config.name) - ) - - config_drive = diskimage.config_drive - # Using a dict with the ID bypasses an image search during - # server creation. - image_external = dict(id=cloud_image.external_id) - image_id = "{path}/{upload_id}".format( - path=self.handler.zk._imageUploadPath( - cloud_image.image_name, - cloud_image.build_id, - cloud_image.provider_name), - upload_id=cloud_image.id) - image_name = diskimage.name - username = cloud_image.username - python_path = cloud_image.python_path - shell_type = cloud_image.shell_type - connection_type = diskimage.connection_type - connection_port = diskimage.connection_port - - else: - # launch using unmanaged cloud image - config_drive = self.label.cloud_image.config_drive - - if self.label.cloud_image.image_id: - # Using a dict with the ID bypasses an image search during - # server creation. - image_external = dict(id=self.label.cloud_image.image_id) - else: - image_external = self.label.cloud_image.external_name - - image_id = self.label.cloud_image.name - image_name = self.label.cloud_image.name - username = self.label.cloud_image.username - python_path = self.label.cloud_image.python_path - shell_type = self.label.cloud_image.shell_type - connection_type = self.label.cloud_image.connection_type - connection_port = self.label.cloud_image.connection_port - - hostname = self.provider_config.hostname_format.format( - label=self.label, provider=self.provider_config, node=self.node - ) - - self.log.info( - "Creating server with hostname %s in %s from image %s" % ( - hostname, self.provider_config.name, image_name)) - - # NOTE: We store the node ID in the server metadata to use for leaked - # instance detection. We cannot use the external server ID for this - # because that isn't available in ZooKeeper until after the server is - # active, which could cause a race in leak detection. - - props = self.label.instance_properties.copy() - for k, v in self.label.dynamic_instance_properties.items(): - try: - props[k] = v.format(request=self.request.getSafeAttributes()) - except Exception: - self.log.exception( - "Error formatting dynamic instance property %s", k) - if not props: - props = None - - try: - server = self.handler.manager.createServer( - hostname, - image=image_external, - min_ram=self.label.min_ram, - flavor_name=self.label.flavor_name, - key_name=self.label.key_name, - az=self.node.az, - config_drive=config_drive, - nodepool_node_id=self.node.id, - nodepool_node_label=self.node.type[0], - nodepool_image_name=image_name, - nodepool_pool_name=self.node.pool, - networks=self.label.networks, - security_groups=self.pool.security_groups, - boot_from_volume=self.label.boot_from_volume, - volume_size=self.label.volume_size, - instance_properties=props, - userdata=self.label.userdata) - except openstack.cloud.exc.OpenStackCloudCreateException as e: - if e.resource_id: - self.node.external_id = e.resource_id - # The outer exception handler will handle storing the - # node immediately after this. - raise - - self.node.external_id = server.id - self.node.hostname = hostname - self.node.image_id = image_id - - pool = self.handler.provider.pools.get(self.node.pool) - resources = self.handler.manager.quotaNeededByLabel( - self.node.type[0], pool) - self.node.resources = resources.get_resources() - if username: - self.node.username = username - - self.node.python_path = python_path - self.node.shell_type = shell_type - self.node.connection_type = connection_type - self.node.connection_port = connection_port - - # Checkpoint save the updated node info - self.zk.storeNode(self.node) - - self.log.debug("Waiting for server %s" % server.id) - server = self.handler.manager.waitForServer( - server, self.provider_config.launch_timeout, - auto_ip=self.pool.auto_floating_ip) - - if server.status != 'ACTIVE': - raise exceptions.LaunchStatusException("Server %s for node id: %s " - "status: %s" % - (server.id, self.node.id, - server.status)) - - # If we didn't specify an AZ, set it to the one chosen by Nova. - # Do this after we are done waiting since AZ may not be available - # immediately after the create request. - if not self.node.az: - self.node.az = server.location.zone - - interface_ip = server.interface_ip - if not interface_ip: - self.log.debug( - "Server data for failed IP: %s" % pprint.pformat( - server)) - raise exceptions.LaunchNetworkException( - "Unable to find public IP of server") - - self.node.host_id = server.host_id - self.node.interface_ip = interface_ip - self.node.public_ipv4 = server.public_v4 - self.node.public_ipv6 = server.public_v6 - self.node.private_ipv4 = server.private_v4 - # devstack-gate multi-node depends on private_v4 being populated - # with something. On clouds that don't have a private address, use - # the public. - if not self.node.private_ipv4: - self.node.private_ipv4 = server.public_v4 - - # Checkpoint save the updated node info - self.zk.storeNode(self.node) - - self.log.debug( - "Node is running [region: %s, az: %s, ip: %s ipv4: %s, " - "ipv6: %s, hostid: %s]" % - (self.node.region, self.node.az, - self.node.interface_ip, self.node.public_ipv4, - self.node.public_ipv6, self.node.host_id)) - - # wait and scan the new node and record in ZooKeeper - host_keys = [] - if self.label.host_key_checking: - try: - self.log.debug("Gathering host keys") - # only gather host keys if the connection type is ssh or - # network_cli - gather_host_keys = ( - connection_type == 'ssh' or - connection_type == 'network_cli') - host_keys = utils.nodescan( - interface_ip, - timeout=self.provider_config.boot_timeout, - gather_hostkeys=gather_host_keys, - port=connection_port) - - if gather_host_keys and not host_keys: - raise exceptions.LaunchKeyscanException( - "Unable to gather host keys") - except exceptions.ConnectionTimeoutException: - self._logConsole(self.node.external_id, self.node.hostname) - raise - - self.node.host_keys = host_keys - self.zk.storeNode(self.node) - - def launch(self): - attempts = 1 - while attempts <= self._retries: - try: - self._launchNode() - break - except kze.SessionExpiredError: - # If we lost our ZooKeeper session, we've lost our node lock - # so there's no need to continue. - raise - except Exception as e: - if attempts <= self._retries: - self.log.exception("Launch attempt %d/%d failed:", - attempts, self._retries) - - # If we got an external id we need to fetch the server info - # again in order to retrieve the fault reason as this is not - # included in the server object we already have. - quota_exceeded = False - if self.node.external_id: - try: - server = self.handler.manager.getServerById( - self.node.external_id) or {} - fault = server.get('fault', {}).get('message') - if fault: - self.log.error('Detailed node error: %s', fault) - if 'quota' in fault: - quota_exceeded = True - except Exception: - self.log.exception( - 'Failed to retrieve node error information:') - - # If we created an instance, delete it. - if self.node.external_id: - deleting_node = zk.Node() - deleting_node.provider = self.node.provider - deleting_node.pool = self.node.pool - deleting_node.type = self.node.type - deleting_node.external_id = self.node.external_id - deleting_node.state = zk.DELETING - self.zk.storeNode(deleting_node) - self.log.info("Node %s scheduled for cleanup", - deleting_node.external_id) - self.node.external_id = None - self.node.public_ipv4 = None - self.node.public_ipv6 = None - self.node.interface_ip = None - self.zk.storeNode(self.node) - if attempts == self._retries: - raise - if 'quota exceeded' in str(e).lower(): - quota_exceeded = True - if 'number of ports exceeded' in str(e).lower(): - quota_exceeded = True - - if quota_exceeded: - # A quota exception is not directly recoverable so bail - # out immediately with a specific exception. - self.log.info("Quota exceeded, invalidating quota cache") - self.handler.manager.invalidateQuotaCache() - raise exceptions.QuotaException("Quota exceeded") - attempts += 1 - - self.node.state = zk.READY - self.zk.storeNode(self.node) - self.log.info("Node is ready") - - -class OpenStackNodeRequestHandler(NodeRequestHandler): - - def __init__(self, pw, request): - super().__init__(pw, request) - self.chosen_az = None - self._threads = [] - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.is_alive(): - count += 1 - return count - - def imagesAvailable(self): - ''' - Determines if the requested images are available for this provider. - - ZooKeeper is queried for an image uploaded to the provider that is - in the READY state. - - :returns: True if it is available, False otherwise. - ''' - if self.provider.manage_images: - for label in self.request.node_types: - if self.pool.labels[label].cloud_image: - if not self.manager.labelReady(self.pool.labels[label]): - return False - else: - if not self.zk.getMostRecentImageUpload( - self.pool.labels[label].diskimage.name, - self.provider.name): - return False - return True - - def hasRemainingQuota(self, ntype): - needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool) - - if not self.pool.ignore_provider_quota: - # Calculate remaining quota which is calculated as: - # quota = - - - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed()) - cloud_quota.subtract(needed_quota) - self.log.debug("Predicted remaining provider quota: %s", - cloud_quota) - - if not cloud_quota.non_negative(): - return False - - # Now calculate pool specific quota. Values indicating no quota default - # to math.inf representing infinity that can be calculated with. - pool_quota = QuotaInformation(cores=self.pool.max_cores, - instances=self.pool.max_servers, - ram=self.pool.max_ram, - default=math.inf) - pool_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed(self.pool)) - self.log.debug("Current pool quota: %s" % pool_quota) - pool_quota.subtract(needed_quota) - self.log.debug("Predicted remaining pool quota: %s", pool_quota) - - return pool_quota.non_negative() - - def hasProviderQuota(self, node_types): - needed_quota = QuotaInformation() - - for ntype in node_types: - needed_quota.add( - self.manager.quotaNeededByLabel(ntype, self.pool)) - - if not self.pool.ignore_provider_quota: - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract(needed_quota) - - if not cloud_quota.non_negative(): - return False - - # Now calculate pool specific quota. Values indicating no quota default - # to math.inf representing infinity that can be calculated with. - pool_quota = QuotaInformation(cores=self.pool.max_cores, - instances=self.pool.max_servers, - ram=self.pool.max_ram, - default=math.inf) - pool_quota.subtract(needed_quota) - return pool_quota.non_negative() - - def checkReusableNode(self, node): - if self.chosen_az and node.az != self.chosen_az: - return False - return True - - def nodeReusedNotification(self, node): - """ - We attempt to group the node set within the same provider availability - zone. - For this to work properly, the provider entry in the nodepool - config must list the availability zones. Otherwise, new nodes will be - put in random AZs at nova's whim. The exception being if there is an - existing node in the READY state that we can select for this node set. - Its AZ will then be used for new nodes, as well as any other READY - nodes. - """ - # If we haven't already chosen an AZ, select the - # AZ from this ready node. This will cause new nodes - # to share this AZ, as well. - if not self.chosen_az and node.az: - self.chosen_az = node.az - - def setNodeMetadata(self, node): - """ - Select grouping AZ if we didn't set AZ from a selected, - pre-existing node - """ - if not self.chosen_az: - self.chosen_az = random.choice( - self.pool.azs or self.manager.getAZs()) - node.az = self.chosen_az - node.cloud = self.provider.cloud_config.name - node.region = self.provider.region_name - - def launchesComplete(self): - ''' - Check if all launch requests have completed. - - When all of the Node objects have reached a final state (READY, FAILED - or ABORTED), we'll know all threads have finished the launch process. - ''' - if not self._threads: - return True - - # Give the NodeLaunch threads time to finish. - if self.alive_thread_count: - return False - - node_states = [node.state for node in self.nodeset] - - # NOTE: It's very important that NodeLauncher always sets one of - # these states, no matter what. - if not all(s in (zk.READY, zk.FAILED, zk.ABORTED) - for s in node_states): - return False - - return True - - def launch(self, node): - label = self.pool.labels[node.type[0]] - thd = OpenStackNodeLauncher(self, node, self.provider, label, - self.request) - thd.start() - self._threads.append(thd) diff --git a/nodepool/driver/openstack/provider.py b/nodepool/driver/openstack/provider.py deleted file mode 100755 index ed1947e0b..000000000 --- a/nodepool/driver/openstack/provider.py +++ /dev/null @@ -1,659 +0,0 @@ -# Copyright (C) 2011-2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import operator -import threading -import time - -import openstack -from openstack.exceptions import ResourceTimeout - -from nodepool import exceptions -from nodepool.driver import Provider -from nodepool.driver.utils import QuotaInformation, QuotaSupport -from nodepool.driver.utils import NodeDeleter -from nodepool import stats -from nodepool import version -from nodepool.zk import zookeeper as zk - -# Import entire module to avoid partial-loading, circular import -from nodepool.driver.openstack import handler - - -IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list - - -class OpenStackProvider(Provider, QuotaSupport): - log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider") - - def __init__(self, provider): - super().__init__() - self.provider = provider - self._images = {} - self._networks = {} - self.__flavors = {} # TODO(gtema): caching - self.__azs = None - self._zk = None - self._down_ports = set() - self._last_port_cleanup = None - self._statsd = stats.get_client() - self.running = False - self._server_list_watcher = threading.Thread( - name='ServerListWatcher', target=self._watchServerList, - daemon=True) - self._server_list_watcher_stop_event = threading.Event() - self._cleanup_queue = {} - self._startup_queue = {} - - def start(self, zk_conn): - self.resetClient() - self._zk = zk_conn - self.running = True - self._server_list_watcher.start() - - def stop(self): - self.running = False - self._server_list_watcher_stop_event.set() - - def idle(self): - pass - - def join(self): - self._server_list_watcher.join() - - def getRequestHandler(self, poolworker, request): - return handler.OpenStackNodeRequestHandler(poolworker, request) - - # TODO(gtema): caching - @property - def _flavors(self): - if not self.__flavors: - self.__flavors = self._getFlavors() - return self.__flavors - - def _getClient(self): - rate_limit = None - # nodepool tracks rate limit in time between requests. - # openstacksdk tracks rate limit in requests per second. - # 1/time = requests-per-second. - if self.provider.rate: - rate_limit = 1 / self.provider.rate - return openstack.connection.Connection( - config=self.provider.cloud_config, - use_direct_get=False, - rate_limit=rate_limit, - app_name='nodepool', - app_version=version.version_info.version_string() - ) - - def getProviderLimits(self): - limits = self._client.get_compute_limits() - return QuotaInformation.construct_from_limits(limits) - - def quotaNeededByLabel(self, ntype, pool): - provider_label = pool.labels[ntype] - - flavor = self.findFlavor(provider_label.flavor_name, - provider_label.min_ram) - - return QuotaInformation.construct_from_flavor(flavor) - - def unmanagedQuotaUsed(self): - ''' - Sums up the quota used by servers unmanaged by nodepool. - - :return: Calculated quota in use by unmanaged servers - ''' - flavors = self.listFlavorsById() - used_quota = QuotaInformation() - - node_ids = set([n.id for n in self._zk.nodeIterator()]) - - for server in self.listNodes(): - meta = server.get('metadata', {}) - - nodepool_provider_name = meta.get('nodepool_provider_name') - if (nodepool_provider_name and - nodepool_provider_name == self.provider.name): - # This provider (regardless of the launcher) owns this - # server so it must not be accounted for unmanaged - # quota; unless it has leaked. - nodepool_node_id = meta.get('nodepool_node_id') - # FIXME(tobiash): Add a test case for this - if nodepool_node_id and nodepool_node_id in node_ids: - # It has not leaked. - continue - - # In earlier versions of nova or the sdk, flavor has just an id. - # In later versions it returns the information we're looking for. - # If we get the information we want, we do not need to try to - # lookup the flavor in our list. - if hasattr(server.flavor, 'vcpus'): - flavor = server.flavor - else: - flavor = flavors.get(server.flavor.id) - # If we still haven't found the flavor, skip handling this - # server instead of failing completely - if not flavor: - continue - used_quota.add(QuotaInformation.construct_from_flavor(flavor)) - - return used_quota - - def resetClient(self): - self._client = self._getClient() - - def _getFlavors(self): - flavors = self.listFlavors() - flavors.sort(key=operator.itemgetter('ram')) - return flavors - - # TODO(gtema): These next three methods duplicate logic that is in - # openstacksdk, caching is not enabled there by default - # Remove it when caching is default - def _findFlavorByName(self, flavor_name): - for f in self._flavors: - if flavor_name in (f['name'], f['id']): - return f - raise Exception("Unable to find flavor: %s" % flavor_name) - - def _findFlavorByRam(self, min_ram, flavor_name): - for f in self._flavors: - if (f['ram'] >= min_ram - and (not flavor_name or flavor_name in f['name'])): - return f - raise Exception("Unable to find flavor with min ram: %s" % min_ram) - - def findFlavor(self, flavor_name, min_ram): - # Note: this will throw an error if the provider is offline - # but all the callers are in threads (they call in via CreateServer) so - # the mainloop won't be affected. - # TODO(gtema): enable commented block when openstacksdk has caching - # enabled by default - # if min_ram: - # return self._client.get_flavor_by_ram( - # ram=min_ram, - # include=flavor_name, - # get_extra=False) - # else: - # return self._client.get_flavor(flavor_name, get_extra=False) - - if min_ram: - return self._findFlavorByRam(min_ram, flavor_name) - else: - return self._findFlavorByName(flavor_name) - - def findImage(self, name): - if name in self._images: - return self._images[name] - - image = self._client.get_image(name, filters={'status': 'active'}) - self._images[name] = image - return image - - def findNetwork(self, name): - if name in self._networks: - return self._networks[name] - - network = self._client.get_network(name) - if not network: - raise Exception("Unable to find network %s in provider %s" % ( - name, self.provider.name)) - self._networks[name] = network - return network - - def deleteImage(self, name, id): - if name in self._images: - del self._images[name] - - return self._client.delete_image(id) - - def createServer(self, name, image, - flavor_name=None, min_ram=None, - az=None, key_name=None, config_drive=True, - nodepool_node_id=None, nodepool_node_label=None, - nodepool_image_name=None, - nodepool_pool_name=None, - networks=None, security_groups=None, - boot_from_volume=False, volume_size=50, - instance_properties=None, userdata=None): - if not networks: - networks = [] - if not isinstance(image, dict): - # if it's a dict, we already have the cloud id. If it's not, - # we don't know if it's name or ID so need to look it up - image = self.findImage(image) - flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram) - create_args = dict(name=name, - image=image, - flavor=flavor, - config_drive=config_drive) - if boot_from_volume: - create_args['boot_from_volume'] = boot_from_volume - create_args['volume_size'] = volume_size - # NOTE(pabelanger): Always cleanup volumes when we delete a server. - create_args['terminate_volume'] = True - if key_name: - create_args['key_name'] = key_name - if az: - create_args['availability_zone'] = az - if security_groups: - create_args['security_groups'] = security_groups - if userdata: - create_args['userdata'] = userdata - nics = [] - for network in networks: - net_id = self.findNetwork(network)['id'] - nics.append({'net-id': net_id}) - if nics: - create_args['nics'] = nics - # Put provider.name and image_name in as groups so that ansible - # inventory can auto-create groups for us based on each of those - # qualities - # Also list each of those values directly so that non-ansible - # consumption programs don't need to play a game of knowing that - # groups[0] is the image name or anything silly like that. - groups_list = [self.provider.name] - - if nodepool_image_name: - groups_list.append(nodepool_image_name) - if nodepool_node_label: - groups_list.append(nodepool_node_label) - meta = dict( - groups=",".join(groups_list), - nodepool_provider_name=self.provider.name, - nodepool_pool_name=nodepool_pool_name, - ) - # merge in any provided properties - if instance_properties: - meta = {**instance_properties, **meta} - if nodepool_node_id: - meta['nodepool_node_id'] = nodepool_node_id - if nodepool_image_name: - meta['nodepool_image_name'] = nodepool_image_name - if nodepool_node_label: - meta['nodepool_node_label'] = nodepool_node_label - create_args['meta'] = meta - - try: - return self._client.create_server(wait=False, **create_args) - except openstack.exceptions.BadRequestException: - # We've gotten a 400 error from nova - which means the request - # was malformed. The most likely cause of that, unless something - # became functionally and systemically broken, is stale az, image - # or flavor cache. Log a message, invalidate the caches so that - # next time we get new caches. - self._images = {} - self.__azs = None - self.__flavors = {} # TODO(gtema): caching - self.log.info( - "Clearing az, flavor and image caches due to 400 error " - "from nova") - raise - - def getServer(self, server_id): - return self._client.get_server(server_id) - - def getServerById(self, server_id): - return self._client.get_server_by_id(server_id) - - def getServerConsole(self, server_id): - try: - return self._client.get_server_console(server_id) - except openstack.exceptions.OpenStackCloudException: - return None - - def waitForServer(self, server, timeout=3600, auto_ip=True): - # This method is called from a separate thread per server. In order to - # reduce thread contention we don't call wait_for_server right now - # but put this thread on sleep until the desired instance is either - # in ACTIVE or ERROR state. After that just continue with - # wait_for_server which will continue its magic. - # TODO: log annotation - self.log.debug('Wait for central server creation %s', server.id) - event = threading.Event() - start_time = time.monotonic() - self._startup_queue[server.id] = (event, start_time + timeout) - if not event.wait(timeout=timeout): - # On timeout emit the same exception as wait_for_server would to - timeout_message = "Timeout waiting for the server to come up." - raise ResourceTimeout(timeout_message) - - # TODO: log annotation - self.log.debug('Finished wait for central server creation %s', - server.id) - - # Re-calculate timeout to account for the duration so far - elapsed = time.monotonic() - start_time - timeout = max(0, timeout - elapsed) - - return self._client.wait_for_server( - server=server, auto_ip=auto_ip, - reuse=False, timeout=timeout) - - def waitForNodeCleanup(self, server_id, timeout=600): - event = threading.Event() - self._cleanup_queue[server_id] = (event, time.monotonic() + timeout) - if not event.wait(timeout=timeout): - raise exceptions.ServerDeleteException( - "server %s deletion" % server_id) - - def createImage(self, server, image_name, meta): - return self._client.create_image_snapshot( - image_name, server, **meta) - - def getImage(self, image_id): - return self._client.get_image(image_id, filters={'status': 'active'}) - - def labelReady(self, label): - if not label.cloud_image: - return False - - # If an image ID was supplied, we'll assume it is ready since - # we don't currently have a way of validating that (except during - # server creation). - if label.cloud_image.image_id: - return True - - image = self.getImage(label.cloud_image.external_name) - if not image: - self.log.warning( - "Provider %s is configured to use %s as the" - " cloud-image for label %s and that" - " cloud-image could not be found in the" - " cloud." % (self.provider.name, - label.cloud_image.external_name, - label.name)) - return False - return True - - def uploadImage(self, provider_image, image_name, filename, - image_type=None, meta=None, md5=None, sha256=None): - # configure glance and upload image. Note the meta flags - # are provided as custom glance properties - # NOTE: we have wait=True set here. This is not how we normally - # do things in nodepool, preferring to poll ourselves thankyouverymuch. - # However - two things to note: - # - PUT has no aysnc mechanism, so we have to handle it anyway - # - v2 w/task waiting is very strange and complex - but we have to - # block for our v1 clouds anyway, so we might as well - # have the interface be the same and treat faking-out - # a openstacksdk-level fake-async interface later - if not meta: - meta = {} - if image_type: - meta['disk_format'] = image_type - image = self._client.create_image( - name=image_name, - filename=filename, - is_public=False, - wait=True, - md5=md5, - sha256=sha256, - **meta) - return image.id - - def listPorts(self, status=None): - ''' - List known ports. - - :param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'. - ''' - if status: - ports = self._client.list_ports(filters={'status': status}) - else: - ports = self._client.list_ports() - return ports - - def deletePort(self, port_id): - self._client.delete_port(port_id) - - def listImages(self): - return self._client.list_images() - - def listFlavors(self): - return self._client.list_flavors(get_extra=False) - - def listFlavorsById(self): - flavors = {} - for flavor in self._client.list_flavors(get_extra=False): - flavors[flavor.id] = flavor - return flavors - - def listNodes(self): - # list_servers carries the nodepool server list caching logic - return self._client.list_servers() - - def deleteServer(self, server_id): - return self._client.delete_server(server_id, delete_ips=True) - - def startNodeCleanup(self, node): - t = NodeDeleter(self._zk, self, node) - t.start() - return t - - def cleanupNode(self, server_id): - server = self.getServer(server_id) - if not server: - raise exceptions.NotFound() - - self.log.debug('Deleting server %s' % server_id) - self.deleteServer(server_id) - - def cleanupLeakedInstances(self): - ''' - Delete any leaked server instances. - - Remove any servers found in this provider that are not recorded in - the ZooKeeper data. - ''' - - deleting_nodes = {} - - for node in self._zk.nodeIterator(): - if node.state == zk.DELETING: - if node.provider != self.provider.name: - continue - if node.provider not in deleting_nodes: - deleting_nodes[node.provider] = [] - deleting_nodes[node.provider].append(node.external_id) - - for server in self._client.list_servers(bare=True): - meta = server.get('metadata', {}) - - if 'nodepool_provider_name' not in meta: - continue - - if meta['nodepool_provider_name'] != self.provider.name: - # Another launcher, sharing this provider but configured - # with a different name, owns this. - continue - - if (self.provider.name in deleting_nodes and - server.id in deleting_nodes[self.provider.name]): - # Already deleting this node - continue - - if not self._zk.getNode(meta['nodepool_node_id']): - self.log.warning( - "Marking for delete leaked instance %s (%s) in %s " - "(unknown node id %s)", - server.name, server.id, self.provider.name, - meta['nodepool_node_id'] - ) - # Create an artifical node to use for deleting the server. - node = zk.Node() - node.external_id = server.id - node.provider = self.provider.name - node.pool = meta.get('nodepool_pool_name') - node.state = zk.DELETING - self._zk.storeNode(node) - if self._statsd: - key = ('nodepool.provider.%s.leaked.nodes' - % self.provider.name) - self._statsd.incr(key) - - def filterComputePorts(self, ports): - ''' - Return a list of compute ports (or no device owner). - - We are not interested in ports for routers or DHCP. - ''' - ret = [] - for p in ports: - if (p.device_owner is None or p.device_owner == '' or - p.device_owner.startswith("compute:")): - ret.append(p) - return ret - - def cleanupLeakedPorts(self): - if not self._last_port_cleanup: - self._last_port_cleanup = time.monotonic() - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - self._down_ports = set([p.id for p in ports]) - return - - # Return if not enough time has passed between cleanup - last_check_in_secs = int(time.monotonic() - self._last_port_cleanup) - if last_check_in_secs <= self.provider.port_cleanup_interval: - return - - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - current_set = set([p.id for p in ports]) - remove_set = current_set & self._down_ports - - removed_count = 0 - for port_id in remove_set: - try: - self.deletePort(port_id) - except Exception: - self.log.exception("Exception deleting port %s in %s:", - port_id, self.provider.name) - else: - removed_count += 1 - self.log.debug("Removed DOWN port %s in %s", - port_id, self.provider.name) - - if self._statsd and removed_count: - key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name) - self._statsd.incr(key, removed_count) - - self._last_port_cleanup = time.monotonic() - - # Rely on OpenStack to tell us the down ports rather than doing our - # own set adjustment. - ports = self.listPorts(status='DOWN') - ports = self.filterComputePorts(ports) - self._down_ports = set([p.id for p in ports]) - - def cleanupLeakedResources(self): - self.cleanupLeakedInstances() - if self.provider.port_cleanup_interval: - self.cleanupLeakedPorts() - if self.provider.clean_floating_ips: - did_clean = self._client.delete_unattached_floating_ips() - if did_clean: - # some openstacksdk's return True if any port was - # cleaned, rather than the count. Just set it to 1 to - # indicate something happened. - if type(did_clean) == bool: - did_clean = 1 - if self._statsd: - key = ('nodepool.provider.%s.leaked.floatingips' - % self.provider.name) - self._statsd.incr(key, did_clean) - - def getAZs(self): - if self.__azs is None: - self.__azs = self._client.list_availability_zone_names() - if not self.__azs: - # If there are no zones, return a list containing None so that - # random.choice can pick None and pass that to Nova. If this - # feels dirty, please direct your ire to policy.json and the - # ability to turn off random portions of the OpenStack API. - self.__azs = [None] - return self.__azs - - def _watchServerList(self): - log = logging.getLogger( - "nodepool.driver.openstack.OpenStackProvider.watcher") - while self.running: - if self._server_list_watcher_stop_event.wait(5): - # We're stopping now so don't wait with any thread for node - # deletion. - for event, _ in self._cleanup_queue.values(): - event.set() - for event, _ in self._startup_queue.values(): - event.set() - break - - if not self._cleanup_queue and not self._startup_queue: - # No server deletion to wait for so check can be skipped - continue - - try: - log.debug('Get server list') - start = time.monotonic() - # List bare to avoid neutron calls - servers = self._client.list_servers(bare=True) - log.debug('Got server list in %.3fs', time.monotonic() - start) - except Exception: - log.exception('Failed to get server list') - continue - - def process_timeouts(queue): - for server_id in list(queue.keys()): - # Remove entries that are beyond timeout - _, timeout = queue[server_id] - if time.monotonic() > timeout: - del queue[server_id] - - # Process cleanup queue - existing_server_ids = { - server.id for server in servers - if server.status != 'DELETED' - } - for server_id in list(self._cleanup_queue.keys()): - # Notify waiting threads that don't have server ids - if server_id not in existing_server_ids: - # Notify the thread which is waiting for the delete - log.debug('Waking up cleanup thread for server %s', - server_id) - self._cleanup_queue[server_id][0].set() - del self._cleanup_queue[server_id] - - # Process startup queue - finished_server_ids = { - server.id for server in servers - if server.status in ('ACTIVE', 'ERROR') - } - for server_id in list(self._startup_queue.keys()): - # Notify waiting threads that don't have server ids - if server_id in finished_server_ids: - # Notify the thread which is waiting for the delete - log.debug('Waking up startup thread for server %s', - server_id) - self._startup_queue[server_id][0].set() - del self._startup_queue[server_id] - - # Process timeouts - process_timeouts(self._cleanup_queue) - process_timeouts(self._startup_queue) - - log.debug('Done') diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index ed5d289a9..d2b344c9f 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -17,6 +17,7 @@ import time import logging import math +import random import threading from concurrent.futures.thread import ThreadPoolExecutor @@ -137,7 +138,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): 'nodepool_provider_name': self.manager.provider.name} self.state_machine = self.manager.adapter.getCreateStateMachine( hostname, label, image_external_id, metadata, retries, - self.handler.request, self.log) + self.handler.request, self.handler.chosen_az, self.log) def updateNodeFromInstance(self, instance): if instance is None: @@ -157,6 +158,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): node.public_ipv4 = instance.public_ipv4 node.private_ipv4 = instance.private_ipv4 node.public_ipv6 = instance.public_ipv6 + node.host_id = instance.host_id node.cloud = instance.cloud node.region = instance.region node.az = instance.az @@ -205,20 +207,17 @@ class StateMachineNodeLauncher(stats.StatsReporter): instance = state_machine.advance() self.log.debug(f"State machine for {node.id} at " f"{state_machine.state}") - if not node.external_id: - if not state_machine.external_id: - raise Exception("Driver implementation error: state " - "machine must produce external ID " - "after first advancement") + if not node.external_id and state_machine.external_id: node.external_id = state_machine.external_id self.zk.storeNode(node) if state_machine.complete and not self.keyscan_future: self.updateNodeFromInstance(instance) self.log.debug("Submitting keyscan request for %s", node.interface_ip) + label = self.handler.pool.labels[self.node.type[0]] future = self.manager.keyscan_worker.submit( keyscan, - self.handler.pool.host_key_checking, + label.host_key_checking, node.id, node.interface_ip, node.connection_type, node.connection_port, self.manager.provider.boot_timeout) @@ -240,6 +239,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): node.external_id = state_machine.external_id self.zk.storeNode(node) statsd_key = 'error.quota' + self.manager.invalidateQuotaCache() except Exception as e: self.log.exception( "Launch failed for node %s:", node.id) @@ -352,6 +352,7 @@ class StateMachineHandler(NodeRequestHandler): def __init__(self, pw, request): super().__init__(pw, request) + self.chosen_az = None self.launchers = [] @property @@ -364,6 +365,15 @@ class StateMachineHandler(NodeRequestHandler): :returns: True if it is available, False otherwise. ''' + for label in self.request.node_types: + if self.pool.labels[label].cloud_image: + if not self.manager.labelReady(self.pool.labels[label]): + return False + else: + if not self.zk.getMostRecentImageUpload( + self.pool.labels[label].diskimage.name, + self.provider.name): + return False return True def hasProviderQuota(self, node_types): @@ -414,17 +424,18 @@ class StateMachineHandler(NodeRequestHandler): needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool) self.log.debug("Needed quota: %s", needed_quota) - # Calculate remaining quota which is calculated as: - # quota = - - - cloud_quota = self.manager.estimatedNodepoolQuota() - cloud_quota.subtract( - self.manager.estimatedNodepoolQuotaUsed()) - cloud_quota.subtract(needed_quota) - self.log.debug("Predicted remaining provider quota: %s", - cloud_quota) + if not self.pool.ignore_provider_quota: + # Calculate remaining quota which is calculated as: + # quota = - - + cloud_quota = self.manager.estimatedNodepoolQuota() + cloud_quota.subtract( + self.manager.estimatedNodepoolQuotaUsed()) + cloud_quota.subtract(needed_quota) + self.log.debug("Predicted remaining provider quota: %s", + cloud_quota) - if not cloud_quota.non_negative(): - return False + if not cloud_quota.non_negative(): + return False # Now calculate pool specific quota. Values indicating no quota default # to math.inf representing infinity that can be calculated with. @@ -444,6 +455,37 @@ class StateMachineHandler(NodeRequestHandler): return pool_quota.non_negative() + def checkReusableNode(self, node): + if self.chosen_az and node.az != self.chosen_az: + return False + return True + + def nodeReusedNotification(self, node): + """ + We attempt to group the node set within the same provider availability + zone. + For this to work properly, the provider entry in the nodepool + config must list the availability zones. Otherwise, new node placement + will be determined by the cloud. The exception being if there is an + existing node in the READY state that we can select for this node set. + Its AZ will then be used for new nodes, as well as any other READY + nodes. + """ + # If we haven't already chosen an AZ, select the + # AZ from this ready node. This will cause new nodes + # to share this AZ, as well. + if not self.chosen_az and node.az: + self.chosen_az = node.az + + def setNodeMetadata(self, node): + """ + Select grouping AZ if we didn't set AZ from a selected, + pre-existing node + """ + if not self.chosen_az: + self.chosen_az = random.choice( + self.pool.azs or self.manager.adapter.getAZs()) + def launchesComplete(self): ''' Check if all launch requests have completed. @@ -495,9 +537,11 @@ class StateMachineProvider(Provider, QuotaSupport): super().start(zk_conn) self.running = True self._zk = zk_conn - self.keyscan_worker = ThreadPoolExecutor() + self.keyscan_worker = ThreadPoolExecutor( + thread_name_prefix=f'keyscan-{self.provider.name}') self.state_machine_thread = threading.Thread( - target=self._runStateMachines) + target=self._runStateMachines, + daemon=True) self.state_machine_thread.start() def stop(self): @@ -555,7 +599,7 @@ class StateMachineProvider(Provider, QuotaSupport): return StateMachineHandler(poolworker, request) def labelReady(self, label): - return True + return self.adapter.labelReady(label) def getProviderLimits(self): try: @@ -745,6 +789,7 @@ class Instance: * cloud: str * az: str * region: str + * host_id: str * driver_data: any * slot: int @@ -769,6 +814,7 @@ class Instance: self.cloud = None self.az = None self.region = None + self.host_id = None self.metadata = {} self.driver_data = None self.slot = None @@ -951,6 +997,36 @@ class Adapter: """ return QuotaInformation(instances=1) + def getAZs(self): + """Return a list of availability zones for this provider + + One of these will be selected at random and supplied to the + create state machine. If a request handler is building a node + set from an existing ready node, then the AZ from that node + will be used instead of the results of this method. + + :returns: A list of availability zone names. + """ + return [None] + + def labelReady(self, label): + """Indicate whether a label is ready in the provided cloud + + This is used by the launcher to determine whether it should + consider a label to be in-service for a provider. If this + returns False, the label will be ignored for this provider. + + This does not need to consider whether a diskimage is ready; + the launcher handles that itself. Instead, this can be used + to determine whether a cloud-image is available. + + :param ProviderLabel label: A config object describing a label + for an instance. + + :returns: A bool indicating whether the label is ready. + """ + return True + # The following methods must be implemented only if image # management is supported: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index f49593f87..820ed1b61 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -287,6 +287,10 @@ class BaseTestCase(testtools.TestCase): continue if t.name.startswith("ThreadPoolExecutor"): continue + if t.name.startswith("openstack-api"): + continue + if t.name.startswith("keyscan"): + continue if t.name not in whitelist: done = False if done: @@ -584,7 +588,11 @@ class DBTestCase(BaseTestCase): for _ in iterate_timeout(ONE_MINUTE, Exception, "Cloud instance deletion", interval=1): - servers = manager.listNodes() + if hasattr(manager, 'adapter'): + servers = manager.adapter._listServers() + else: + # TODO: remove once all drivers use statemachine + servers = manager.listNodes() if not (instance_id in [s.id for s in servers]): break diff --git a/nodepool/tests/fixtures/node_auto_floating_ip.yaml b/nodepool/tests/fixtures/node_auto_floating_ip.yaml index 84051ed0f..e6ddde11a 100644 --- a/nodepool/tests/fixtures/node_auto_floating_ip.yaml +++ b/nodepool/tests/fixtures/node_auto_floating_ip.yaml @@ -34,7 +34,7 @@ providers: - name: main max-servers: 96 networks: - - 'some-name' + - 'no-auto-ip-network-name' auto-floating-ip: False labels: - name: fake-label1 diff --git a/nodepool/tests/unit/test_builder.py b/nodepool/tests/unit/test_builder.py index 4566f7dfc..77d007ce3 100644 --- a/nodepool/tests/unit/test_builder.py +++ b/nodepool/tests/unit/test_builder.py @@ -20,7 +20,7 @@ import mock import time from nodepool import builder, tests -from nodepool.driver.fake import provider as fakeprovider +from nodepool.driver.fake import adapter as fakeadapter from nodepool.zk import zookeeper as zk from nodepool.config import Config from nodepool.nodeutils import iterate_timeout @@ -173,13 +173,13 @@ class TestNodePoolBuilder(tests.DBTestCase): """Test that image upload fails are handled properly.""" # Now swap out the upload fake so that the next uploads fail - fake_client = fakeprovider.FakeUploadFailCloud(times_to_fail=1) + fake_client = fakeadapter.FakeUploadFailCloud(times_to_fail=1) def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node.yaml') @@ -264,13 +264,13 @@ class TestNodePoolBuilder(tests.DBTestCase): def test_image_removal_dib_deletes_first(self): # Break cloud image deleting - fake_client = fakeprovider.FakeDeleteImageFailCloud() + fake_client = fakeadapter.FakeDeleteImageFailCloud() def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_two_image.yaml') diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index a395fabcc..4b8c79d32 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -121,7 +121,7 @@ class TestNodepoolCMD(tests.DBTestCase): def fail_list(self): raise RuntimeError('Fake list error') self.useFixture(fixtures.MonkeyPatch( - 'nodepool.driver.fake.provider.FakeOpenStackCloud.list_servers', + 'nodepool.driver.fake.adapter.FakeOpenStackCloud.list_servers', fail_list)) configfile = self.setup_config("node_cmd.yaml") diff --git a/nodepool/tests/unit/test_driver_metastatic.py b/nodepool/tests/unit/test_driver_metastatic.py index c57627873..ef2a3e92e 100644 --- a/nodepool/tests/unit/test_driver_metastatic.py +++ b/nodepool/tests/unit/test_driver_metastatic.py @@ -86,7 +86,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -151,7 +151,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -166,7 +166,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Allocate a second node, should have same backing node node2 = self._requestNode() @@ -188,7 +188,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Request a node, verify that there is a backing node, and it # has the same connection info @@ -209,7 +209,7 @@ class TestDriverMetastatic(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") # Delete the metastatic node and verify that backing is deleted node1.state = zk.DELETING diff --git a/nodepool/tests/unit/test_driver_static.py b/nodepool/tests/unit/test_driver_static.py index 206cacb65..b1581362a 100644 --- a/nodepool/tests/unit/test_driver_static.py +++ b/nodepool/tests/unit/test_driver_static.py @@ -498,7 +498,8 @@ class TestDriverStatic(tests.DBTestCase): self.wait_for_config(pool) manager = pool.getProviderManager('openstack-provider') - manager._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image") + manager.adapter.IMAGE_CHECK_TIMEOUT = 0 req = zk.NodeRequest() req.state = zk.REQUESTED diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index a0ea800a2..92e021ce2 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -24,7 +24,8 @@ import testtools from nodepool import tests from nodepool.zk import zookeeper as zk from nodepool.zk.components import PoolComponent -from nodepool.driver.fake import provider as fakeprovider +from nodepool.driver.statemachine import StateMachineProvider +from nodepool.driver.fake import adapter as fakeadapter from nodepool.nodeutils import iterate_timeout import nodepool.launcher from nodepool.version import get_version_string @@ -35,6 +36,12 @@ from kazoo import exceptions as kze class TestLauncher(tests.DBTestCase): log = logging.getLogger("nodepool.TestLauncher") + def setUp(self): + super().setUp() + + StateMachineProvider.MINIMUM_SLEEP = 0.1 + StateMachineProvider.MAXIMUM_SLEEP = 1 + def test_node_assignment(self): ''' Successful node launch should have unlocked nodes in READY state @@ -89,7 +96,7 @@ class TestLauncher(tests.DBTestCase): # We check the "cloud" side attributes are set from nodepool side provider = pool.getProviderManager('fake-provider') - cloud_node = provider.getServer(node.hostname) + cloud_node = provider.adapter._getServer(node.external_id) self.assertEqual( cloud_node.metadata['nodepool_provider_name'], 'fake-provider') @@ -183,7 +190,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (max_cores, max_instances, max_ram) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -196,7 +203,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() req1 = zk.NodeRequest() req1.state = zk.REQUESTED @@ -412,7 +419,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (math.inf, 1, math.inf) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -472,7 +479,7 @@ class TestLauncher(tests.DBTestCase): nonlocal max_cores, max_instances, max_ram return (max_cores, max_instances, max_ram) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -485,7 +492,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() # Wait for a single node to be created req1 = zk.NodeRequest() @@ -545,7 +552,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails = 2 + manager.adapter.createServer_fails = 2 req = zk.NodeRequest() req.state = zk.REQUESTED @@ -553,7 +560,7 @@ class TestLauncher(tests.DBTestCase): self.zk.storeNodeRequest(req) req = self.waitForNodeRequest(req) - self.assertEqual(0, manager.createServer_fails) + self.assertEqual(0, manager.adapter.createServer_fails) self.assertEqual(req.state, zk.FAILED) self.assertNotEqual(req.declined_by, []) @@ -578,7 +585,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FULFILLED) # now change the azs in the cloud - cloud = pool.getProviderManager('fake-provider')._getClient() + cloud = pool.getProviderManager('fake-provider').adapter._getClient() cloud._azs = ['new-az1', 'new-az2'] # Do a second request. This will fail because the cached azs are not @@ -661,7 +668,7 @@ class TestLauncher(tests.DBTestCase): provider = (builder._upload_workers[0]._config. provider_managers['fake-provider']) - cloud_image = provider.getImage(image.external_id) + cloud_image = provider.adapter._findImage(image.external_id) self.assertEqual( cloud_image._kw.get('diskimage_metadata'), 'diskimage') self.assertEqual( @@ -690,7 +697,7 @@ class TestLauncher(tests.DBTestCase): # We check the "cloud" side attributes are set from nodepool side provider = pool.getProviderManager('fake-provider') - cloud_node = provider.getServer(nodes[0].hostname) + cloud_node = provider.adapter._getServer(nodes[0].external_id) self.assertEqual( cloud_node.metadata['nodepool_provider_name'], 'fake-provider') @@ -927,7 +934,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(nodes[0].provider, 'fake-provider') self.assertEqual(len(nodes_def_sg), 1) self.assertEqual(nodes_def_sg[0].provider, 'fake-provider') - client = pool.getProviderManager('fake-provider')._getClient() + client = pool.getProviderManager('fake-provider').adapter._getClient() for server in client._server_list: if server.id == nodes[0].external_id: self.assertEqual(server.security_groups, ['fake-sg']) @@ -1066,7 +1073,7 @@ class TestLauncher(tests.DBTestCase): # Get fake cloud record and set status to DELETING manager = pool.getProviderManager('fake-provider') - for instance in manager.listNodes(): + for instance in manager.adapter._listServers(): if instance.id == nodes[0].external_id: instance.status = 'DELETED' break @@ -1078,7 +1085,7 @@ class TestLauncher(tests.DBTestCase): self.waitForNodeDeletion(nodes[0]) api_record_remains = False - for instance in manager.listNodes(): + for instance in manager.adapter._listServers(): if instance.id == nodes[0].external_id: api_record_remains = True break @@ -1098,7 +1105,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails = 2 + manager.adapter.createServer_fails = 2 self.waitForImage('fake-provider', 'fake-image') req = zk.NodeRequest() @@ -1110,7 +1117,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FAILED) # retries in config is set to 2, so 2 attempts to create a server - self.assertEqual(0, manager.createServer_fails) + self.assertEqual(0, manager.adapter.createServer_fails) def test_node_launch_with_broken_znodes(self): """Test that node launch still works if there are broken znodes""" @@ -1152,7 +1159,7 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager.createServer_fails_with_external_id = 2 + manager.adapter.createServer_fails_with_external_id = 2 self.waitForImage('fake-provider', 'fake-image') # Stop the DeletedNodeWorker so we can make sure the fake znode that @@ -1170,7 +1177,8 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FAILED) # retries in config is set to 2, so 2 attempts to create a server - self.assertEqual(0, manager.createServer_fails_with_external_id) + self.assertEqual( + 0, manager.adapter.createServer_fails_with_external_id) # Request another node to check if nothing is wedged req = zk.NodeRequest() @@ -1186,7 +1194,7 @@ class TestLauncher(tests.DBTestCase): raise RuntimeError('Fake Error') self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, 'deleteServer', fail_delete)) + fakeadapter.FakeAdapter, '_deleteServer', fail_delete)) configfile = self.setup_config('node.yaml') pool = self.useNodepool(configfile, watermark_sleep=1) @@ -1215,10 +1223,10 @@ class TestLauncher(tests.DBTestCase): def test_node_delete_error(self): def error_delete(self, name): # Set ERROR status instead of deleting the node - self._getClient()._server_list[0].status = 'ERROR' + self._client._server_list[0].status = 'ERROR' self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, 'deleteServer', error_delete)) + fakeadapter.FakeAdapter, '_deleteServer', error_delete)) configfile = self.setup_config('node_delete_error.yaml') pool = self.useNodepool(configfile, watermark_sleep=1) @@ -1244,14 +1252,10 @@ class TestLauncher(tests.DBTestCase): # wait the cleanup thread to kick in time.sleep(5) - zk_nodes = self.zk.getNodes() - self.assertEqual(len(zk_nodes), 1) - node = self.zk.getNode(zk_nodes[0]) - self.assertEqual(node.state, zk.DELETING) - - # remove error nodes - pool.getProviderManager( - 'fake-provider')._getClient()._server_list.clear() + # Make sure it shows up as leaked + manager = pool.getProviderManager('fake-provider') + instances = list(manager.adapter.listInstances()) + self.assertEqual(1, len(instances)) def test_leaked_node(self): """Test that a leaked node is deleted""" @@ -1267,7 +1271,7 @@ class TestLauncher(tests.DBTestCase): # Make sure we have a node built and ready self.assertEqual(len(nodes), 1) manager = pool.getProviderManager('fake-provider') - servers = manager.listNodes() + servers = manager.adapter._listServers() self.assertEqual(len(servers), 1) # Delete the node from ZooKeeper, but leave the instance @@ -1286,7 +1290,7 @@ class TestLauncher(tests.DBTestCase): self.waitForInstanceDeletion(manager, nodes[0].external_id) # Make sure we end up with only one server (the replacement) - servers = manager.listNodes() + servers = manager.adapter._listServers() self.assertEqual(len(servers), 1) def test_max_ready_age(self): @@ -1599,9 +1603,10 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="fake-image") - manager._client.create_image(name="fake-image-windows") - manager._client.create_image(name="fake-image-windows-port") + manager.adapter.IMAGE_CHECK_TIMEOUT = 1 + manager.adapter._client.create_image(name="fake-image") + manager.adapter._client.create_image(name="fake-image-windows") + manager.adapter._client.create_image(name="fake-image-windows-port") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) @@ -1638,7 +1643,8 @@ class TestLauncher(tests.DBTestCase): pool.start() self.wait_for_config(pool) manager = pool.getProviderManager('fake-provider') - manager._client.create_image(name="provider-named-image") + manager.adapter.IMAGE_CHECK_TIMEOUT = 1 + manager.adapter._client.create_image(name="provider-named-image") nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) @@ -1936,7 +1942,7 @@ class TestLauncher(tests.DBTestCase): self.wait_for_config(pool) manager = pool.getProviderManager('good-provider') - manager._client.create_image(name="good-image") + manager.adapter._client.create_image(name="good-image") good_req = zk.NodeRequest() good_req.state = zk.REQUESTED @@ -2013,11 +2019,11 @@ class TestLauncher(tests.DBTestCase): time.sleep(1) launcher_pools = self.zk.getRegisteredPools() - @mock.patch('nodepool.driver.openstack.handler.' - 'OpenStackNodeLauncher._launchNode') + @mock.patch('nodepool.driver.statemachine.' + 'StateMachineNodeLauncher.launch') def test_launchNode_session_expired(self, mock_launch): ''' - Test ZK session lost during _launchNode(). + Test ZK session lost during launch(). ''' mock_launch.side_effect = kze.SessionExpiredError() @@ -2044,19 +2050,19 @@ class TestLauncher(tests.DBTestCase): while self.zk.countPoolNodes('fake-provider', 'main'): time.sleep(0) - @mock.patch('nodepool.driver.openstack.provider.' - 'OpenStackProvider.invalidateQuotaCache') + @mock.patch('nodepool.driver.statemachine.' + 'StateMachineProvider.invalidateQuotaCache') def test_launchNode_node_fault_message(self, mock_invalidatequotacache): ''' Test failed launch can get detailed node fault info if available. ''' - fake_client = fakeprovider.FakeLaunchAndGetFaultCloud() + fake_client = fakeadapter.FakeLaunchAndGetFaultCloud() def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_launch_retry.yaml') @@ -2089,20 +2095,19 @@ class TestLauncher(tests.DBTestCase): Test that the launcher keeps trying to spawn a node in case of a delete error ''' - fake_client = fakeprovider.FakeLaunchAndDeleteFailCloud( + fake_client = fakeadapter.FakeLaunchAndDeleteFailCloud( times_to_fail=1) def get_fake_client(*args, **kwargs): return fake_client self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider, '_getClient', + fakeadapter.FakeAdapter, '_getClient', get_fake_client)) configfile = self.setup_config('node_launch_retry.yaml') self.useBuilder(configfile) pool = self.useNodepool(configfile, watermark_sleep=1) - pool.cleanup_interval = 60 pool.start() self.waitForImage('fake-provider', 'fake-image') @@ -2248,7 +2253,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (0, 20, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2283,7 +2288,7 @@ class TestLauncher(tests.DBTestCase): def fake_get_quota(): return (0, 20, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2380,7 +2385,7 @@ class TestLauncher(tests.DBTestCase): return (100, max_instances, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2423,7 +2428,7 @@ class TestLauncher(tests.DBTestCase): return (100, max_instances, 1000000) self.useFixture(fixtures.MockPatchObject( - fakeprovider.FakeProvider.fake_cloud, '_get_quota', + fakeadapter.FakeAdapter.fake_cloud, '_get_quota', fake_get_quota )) @@ -2565,7 +2570,7 @@ class TestLauncher(tests.DBTestCase): self.waitForNodes('fake-label') manager = pool.getProviderManager('fake-provider') - down_ports = manager.listPorts(status='DOWN') + down_ports = manager.adapter._listPorts(status='DOWN') self.assertEqual(2, len(down_ports)) self.log.debug("Down ports: %s", down_ports) @@ -2580,13 +2585,10 @@ class TestLauncher(tests.DBTestCase): except AssertionError: # config still hasn't updated, retry manager = pool.getProviderManager('fake-provider') - # Reset the client as a new fake client will have been - # created. - manager.resetClient() for _ in iterate_timeout(4, Exception, 'assert ports are cleaned'): try: - down_ports = manager.listPorts(status='DOWN') + down_ports = manager.adapter._listPorts(status='DOWN') self.assertEqual(0, len(down_ports)) break except AssertionError: diff --git a/nodepool/tests/unit/test_sdk_integration.py b/nodepool/tests/unit/test_sdk_integration.py index 5bb64a869..e7cbe9b6e 100644 --- a/nodepool/tests/unit/test_sdk_integration.py +++ b/nodepool/tests/unit/test_sdk_integration.py @@ -63,7 +63,7 @@ class TestShadeIntegration(tests.IntegrationTestCase): # thread that causes wait_for_threads in subsequent tests to fail. self.addCleanup(pm.stop) pm.start(None) - self.assertEqual(pm._client.auth, auth_data) + self.assertEqual(pm.adapter._client.auth, auth_data) def test_nodepool_occ_config_reload(self): configfile = self.setup_config('integration_occ.yaml') @@ -76,8 +76,8 @@ class TestShadeIntegration(tests.IntegrationTestCase): pool = self.useNodepool(configfile, watermark_sleep=1) pool.updateConfig() - provider_manager = pool.config.provider_managers['real-provider'] - self.assertEqual(provider_manager._client.auth, auth_data) + pm = pool.config.provider_managers['real-provider'] + self.assertEqual(pm.adapter._client.auth, auth_data) # update the config auth_data['password'] = 'os_new_real' @@ -86,5 +86,5 @@ class TestShadeIntegration(tests.IntegrationTestCase): yaml.safe_dump(occ_config, h) pool.updateConfig() - provider_manager = pool.config.provider_managers['real-provider'] - self.assertEqual(provider_manager._client.auth, auth_data) + pm = pool.config.provider_managers['real-provider'] + self.assertEqual(pm.adapter._client.auth, auth_data)