diff --git a/nodepool/driver/fake/adapter.py b/nodepool/driver/fake/adapter.py index e5526dab5..3121c5bd9 100644 --- a/nodepool/driver/fake/adapter.py +++ b/nodepool/driver/fake/adapter.py @@ -47,6 +47,15 @@ class Dummy(object): except AttributeError: pass + def _get_dict(self): + data = {} + for k in self.__kw.keys(): + data[k] = getattr(self, k) + data.pop('event') + data.pop('_kw') + data.pop('manager') + return data + def __repr__(self): args = [] for k in self.__kw.keys(): @@ -69,6 +78,36 @@ class Dummy(object): def set(self, key, value): setattr(self, key, value) + def copy(self): + data = self._get_dict() + return Dummy(self.__kind, **data) + + +class FakeResponse: + def __init__(self, data): + self._data = data + self.links = [] + + def json(self): + return self._data + + +class FakeSession: + def __init__(self, cloud): + self.cloud = cloud + + def get(self, uri, headers, params): + if uri == '/servers/detail': + server_list = [] + for server in self.cloud._server_list: + data = server._get_dict() + data['hostId'] = data.pop('host_id') + data['OS-EXT-AZ:availability_zone'] = data.pop('location').zone + data['os-extended-volumes:volumes_attached'] =\ + data.pop('volumes') + server_list.append(data) + return FakeResponse({'servers': server_list}) + class FakeOpenStackCloud(object): log = logging.getLogger("nodepool.FakeOpenStackCloud") @@ -82,6 +121,7 @@ class FakeOpenStackCloud(object): return 100, 1000000 def __init__(self, images=None, networks=None): + self.compute = FakeSession(self) self.pause_creates = False self._image_list = images if self._image_list is None: @@ -225,7 +265,7 @@ class FakeOpenStackCloud(object): name='FakeProvider create', args=(s, 0.1, done_status)) t.start() - return s + return s.copy() def _delete(self, name_or_id, instance_list): self.log.debug("Delete from %s" % (repr(instance_list),)) @@ -388,8 +428,9 @@ class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud): *args, **kwargs, done_status='ERROR') # Don't wait for the async update - server.status = 'ERROR' - server.fault = {'message': 'quota server fault'} + orig_server = self._get(server.id, self._server_list) + orig_server.status = 'ERROR' + orig_server.fault = {'message': 'quota server fault'} raise OpenStackCloudCreateException('server', server.id) diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py index e83609b2b..dbb287744 100644 --- a/nodepool/driver/openstack/adapter.py +++ b/nodepool/driver/openstack/adapter.py @@ -17,11 +17,13 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor +from collections import UserDict import functools import logging import math import time import operator +import urllib import openstack from keystoneauth1.exceptions.catalog import EndpointNotFound @@ -66,21 +68,68 @@ def quota_from_limits(compute, volume): return QuotaInformation(**args) +class NodepoolOpenStackServer(UserDict): + # Most of OpenStackSDK is designed around a dictionary interface, + # but due to the historic use of the Munch object, some code + # (add_server_interfaces) accesses values by attribute instead of + # key. For just those values, we provide getters and setters. + + @property + def access_ipv4(self): + return self.data.get('access_ipv4') + + @access_ipv4.setter + def access_ipv4(self, value): + self.data['access_ipv4'] = value + + @property + def public_v4(self): + return self.data.get('public_v4') + + @public_v4.setter + def public_v4(self, value): + self.data['public_v4'] = value + + @property + def private_v4(self): + return self.data.get('private_v4') + + @private_v4.setter + def private_v4(self, value): + self.data['private_v4'] = value + + @property + def access_ipv6(self): + return self.data.get('access_ipv6') + + @access_ipv6.setter + def access_ipv6(self, value): + self.data['access_ipv6'] = value + + @property + def public_v6(self): + return self.data.get('public_v6') + + @public_v6.setter + def public_v6(self, value): + self.data['public_v6'] = value + + class OpenStackInstance(statemachine.Instance): def __init__(self, provider, server, quota): super().__init__() - self.external_id = server.id - self.metadata = server.metadata - self.private_ipv4 = server.private_v4 + self.external_id = server['id'] + self.metadata = server.get('metadata', {}) + self.private_ipv4 = server.get('private_v4') self.private_ipv6 = None - self.public_ipv4 = server.public_v4 - self.public_ipv6 = server.public_v6 - self.host_id = server.host_id + self.public_ipv4 = server.get('public_v4') + self.public_ipv6 = server.get('public_v6') + self.host_id = server['hostId'] self.cloud = provider.cloud_config.name self.region = provider.region_name - self.az = server.location.zone + self.az = server.get('OS-EXT-AZ:availability_zone') - self.interface_ip = server.interface_ip + self.interface_ip = server.get('interface_ip') # TODO: this doesn't match the behavior of other drivers # but is here for backwards compatibility. self.private_ipv4 = self.private_ipv4 or self.public_ipv4 @@ -118,7 +167,7 @@ class OpenStackDeleteStateMachine(statemachine.StateMachine): self.server = self.adapter._getServer(self.external_id) if (self.server and self.adapter._hasFloatingIps() and - self.server.addresses): + self.server.get('addresses')): self.floating_ips = self.adapter._getFloatingIps(self.server) for fip in self.floating_ips: self.adapter._deleteFloatingIp(fip) @@ -274,7 +323,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): self.server = self.adapter._completeApi(self.create_future) if self.server is None: return - self.external_id = self.server.id + self.external_id = self.server['id'] self.state = self.SERVER_CREATING except openstack.cloud.exc.OpenStackCloudCreateException as e: if e.resource_id: @@ -295,7 +344,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): if self.state == self.SERVER_CREATING: self.server = self.adapter._refreshServer(self.server) - if self.server.status == 'ACTIVE': + if self.server['status'] == 'ACTIVE': if (self.label.pool.auto_floating_ip and self.adapter._needsFloatingIp(self.server)): self.floating_ip = self.adapter._createFloatingIp( @@ -303,7 +352,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): self.state = self.FLOATING_IP_CREATING else: self.state = self.COMPLETE - elif self.server.status == 'ERROR': + elif self.server['status'] == 'ERROR': if ('fault' in self.server and self.server['fault'] is not None and 'message' in self.server['fault']): self.log.error( @@ -404,10 +453,11 @@ class OpenStackAdapter(statemachine.Adapter): def listResources(self): for server in self._listServers(): - if server.status.lower() == 'deleted': + if server['status'].lower() == 'deleted': continue - yield OpenStackResource(server.metadata, - OpenStackResource.TYPE_INSTANCE, server.id) + yield OpenStackResource(server.get('metadata', {}), + OpenStackResource.TYPE_INSTANCE, + server['id']) # Floating IP and port leakage can't be handled by the # automatic resource cleanup in cleanupLeakedResources because # openstack doesn't store metadata on those objects, so we @@ -427,12 +477,13 @@ class OpenStackAdapter(statemachine.Adapter): for volume in self._listVolumes(): volumes[volume.id] = volume for server in self._listServers(): - if server.status.lower() == 'deleted': + if server['status'].lower() == 'deleted': continue flavor = self._getFlavorFromServer(server) server_volumes = [] - for vattach in server.volumes: - volume = volumes.get(vattach.id) + for vattach in server.get( + 'os-extended-volumes:volumes_attached', []): + volume = volumes.get(vattach['id']) if volume: server_volumes.append(volume) quota = quota_from_flavor(flavor, volumes=server_volumes) @@ -664,9 +715,94 @@ class OpenStackAdapter(statemachine.Adapter): name, self.provider.name)) return network + # This method is based on code from OpenStackSDK, licensed + # under ASL2. + def _simpleServerList(self): + session = self._client.compute + limit = None + query_params = {} + uri = '/servers/detail' + ret = [] + while uri: + response = session.get( + uri, + headers={"Accept": "application/json"}, + params=query_params, + ) + data = response.json() + + last_marker = query_params.pop('marker', None) + query_params.pop('limit', None) + + resources = data['servers'] + if not isinstance(resources, list): + resources = [resources] + + ret += [NodepoolOpenStackServer(x) for x in resources] + + if resources: + marker = resources[-1]['id'] + uri, next_params = self._getNextLink( + uri, response, data, marker, limit + ) + try: + if next_params['marker'] == last_marker: + raise Exception( + 'Endless pagination loop detected, aborting' + ) + except KeyError: + pass + query_params.update(next_params) + else: + break + return ret + + # This method is based on code from OpenStackSDK, licensed + # under ASL2. + def _getNextLink(self, uri, response, data, marker, limit): + pagination_key = 'servers_links' + next_link = None + params = {} + + if isinstance(data, dict): + links = data.get(pagination_key, {}) + + for item in links: + if item.get('rel') == 'next' and 'href' in item: + next_link = item['href'] + break + + if next_link and next_link.startswith('/v'): + next_link = next_link[next_link.find('/', 1):] + + if not next_link and 'next' in response.links: + # RFC5988 specifies Link headers and requests parses them if they + # are there. We prefer link dicts in resource body, but if those + # aren't there and Link headers are, use them. + next_link = response.links['next']['uri'] + + # Parse params from Link (next page URL) into params. + # This prevents duplication of query parameters that with large + # number of pages result in HTTP 414 error eventually. + if next_link: + parts = urllib.parse.urlparse(next_link) + query_params = urllib.parse.parse_qs(parts.query) + params.update(query_params) + next_link = urllib.parse.urljoin(next_link, parts.path) + + # If we still have no link, and limit was given and is non-zero, + # and the number of records yielded equals the limit, then the user + # is playing pagination ball so we should go ahead and try once more. + if not next_link and limit: + next_link = uri + params['marker'] = marker + params['limit'] = limit + + return next_link, params + def _listServers(self): - with Timer(self.log, 'API call list_servers'): - return self._client.list_servers(bare=True) + with Timer(self.log, 'API call detailed server list'): + return self._simpleServerList() def _listVolumes(self): try: @@ -680,7 +816,7 @@ class OpenStackAdapter(statemachine.Adapter): return self._client.list_floating_ips() def _refreshServer(self, obj): - ret = self._getServer(obj.id) + ret = self._getServer(obj['id']) if ret: return ret return obj @@ -691,8 +827,8 @@ class OpenStackAdapter(statemachine.Adapter): def _getServer(self, external_id): for server in self._listServers(): - if server.id == external_id: - if server.status in ['ACTIVE', 'ERROR']: + if server['id'] == external_id: + if server['status'] in ['ACTIVE', 'ERROR']: return self._expandServer(server) return server return None @@ -707,15 +843,15 @@ class OpenStackAdapter(statemachine.Adapter): if obj is None: return obj for server in self._listServers(): - if server.id == obj.id: - if server.status.lower() == 'deleted': + if server['id'] == obj['id']: + if server['status'].lower() == 'deleted': return None return server return None def _refreshFloatingIp(self, obj): for fip in self._listFloatingIps(): - if fip.id == obj.id: + if fip['id'] == obj['id']: return fip return obj @@ -723,7 +859,7 @@ class OpenStackAdapter(statemachine.Adapter): if obj is None: return obj for fip in self._listFloatingIps(): - if fip.id == obj.id: + if fip['id'] == obj['id']: if fip.status == 'DOWN': return None return fip @@ -774,10 +910,10 @@ class OpenStackAdapter(statemachine.Adapter): # In later versions it returns the information we're looking for. # If we get the information we want, we do not need to try to # lookup the flavor in our list. - if hasattr(server.flavor, 'vcpus'): - return server.flavor + if hasattr(server['flavor'], 'vcpus'): + return server['flavor'] else: - return self._findFlavorById(server.flavor.id) + return self._findFlavorById(server['flavor']['id']) # The port cleanup logic. We don't get tags or metadata, so we # have to figure this out on our own. diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 5f6586f37..97540ff24 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -592,10 +592,12 @@ class DBTestCase(BaseTestCase): interval=1): if hasattr(manager, 'adapter'): servers = manager.adapter._listServers() + server_ids = [s['id'] for s in servers] else: # TODO: remove once all drivers use statemachine servers = manager.listNodes() - if not (instance_id in [s.id for s in servers]): + server_ids = [s.id for s in servers] + if not (instance_id in server_ids): break def waitForNodeRequestLockDeletion(self, request_id): diff --git a/nodepool/tests/fixtures/node_max_hold_age_2.yaml b/nodepool/tests/fixtures/node_max_hold_age_2.yaml index 7deb8ad18..beb4e2a3b 100644 --- a/nodepool/tests/fixtures/node_max_hold_age_2.yaml +++ b/nodepool/tests/fixtures/node_max_hold_age_2.yaml @@ -2,7 +2,7 @@ elements-dir: . images-dir: '{images_dir}' build-log-dir: '{build_log_dir}' -max-hold-age: 10 +max-hold-age: 20 zookeeper-servers: - host: {zookeeper_host} diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index e1345161e..dac293815 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -100,11 +100,12 @@ class TestLauncher(tests.DBTestCase): provider = pool.getProviderManager('fake-provider') cloud_node = provider.adapter._getServer(node.external_id) self.assertEqual( - cloud_node.metadata['nodepool_provider_name'], + cloud_node['metadata']['nodepool_provider_name'], 'fake-provider') - self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main') - self.assertEqual(cloud_node.metadata['prop1'], 'foo') - self.assertEqual(cloud_node.metadata['dynamic-tenant'], + self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], + 'main') + self.assertEqual(cloud_node['metadata']['prop1'], 'foo') + self.assertEqual(cloud_node['metadata']['dynamic-tenant'], 'Tenant is tenant-1') self.zk.lockNode(node, blocking=False) @@ -736,11 +737,11 @@ class TestLauncher(tests.DBTestCase): provider = pool.getProviderManager('fake-provider') cloud_node = provider.adapter._getServer(nodes[0].external_id) self.assertEqual( - cloud_node.metadata['nodepool_provider_name'], + cloud_node['metadata']['nodepool_provider_name'], 'fake-provider') - self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main') - self.assertEqual(cloud_node.metadata['prop1'], 'foo') - self.assertEqual(cloud_node.metadata['dynamic-tenant'], + self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], 'main') + self.assertEqual(cloud_node['metadata']['prop1'], 'foo') + self.assertEqual(cloud_node['metadata']['dynamic-tenant'], 'Tenant is None') def test_node_network_cli(self): @@ -1110,9 +1111,9 @@ class TestLauncher(tests.DBTestCase): # Get fake cloud record and set status to DELETING manager = pool.getProviderManager('fake-provider') - for instance in manager.adapter._listServers(): + for instance in manager.adapter._client._server_list: if instance.id == nodes[0].external_id: - instance.status = 'DELETED' + instance['status'] = 'DELETED' break nodes[0].state = zk.DELETING @@ -1123,7 +1124,7 @@ class TestLauncher(tests.DBTestCase): api_record_remains = False for instance in manager.adapter._listServers(): - if instance.id == nodes[0].external_id: + if instance['id'] == nodes[0].external_id: api_record_remains = True break self.assertTrue(api_record_remains)