From 066699f88aa66d031cdaf6477e9e039378cebe97 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 5 Jul 2023 15:17:04 -0700 Subject: [PATCH] Use low-level OpenStack SDK calls for server listing The OpenStack SDK performs a lot of processing on the JSON data returned by nova, and on large server lists, this can dwarf the actual time needed to receive and parse the JSON. Nodepool uses very little of this information, so let's use the keystoneauth session to get a simple JSON list. The Server object that SDK normally returns is a hybrid object that provides both attributes and dictionary keys. One method that we call has some lingering references to accessors, so we create a UserDict subclass to handle those. Nodepool-internal references are updated from attributes to dictionary keys. Change-Id: Iecc5976858e8d2ee6894a521f6a30f10ae9c6177 --- nodepool/driver/fake/adapter.py | 47 ++++- nodepool/driver/openstack/adapter.py | 196 +++++++++++++++--- nodepool/tests/__init__.py | 4 +- .../tests/fixtures/node_max_hold_age_2.yaml | 2 +- nodepool/tests/unit/test_launcher.py | 23 +- 5 files changed, 226 insertions(+), 46 deletions(-) diff --git a/nodepool/driver/fake/adapter.py b/nodepool/driver/fake/adapter.py index e5526dab5..3121c5bd9 100644 --- a/nodepool/driver/fake/adapter.py +++ b/nodepool/driver/fake/adapter.py @@ -47,6 +47,15 @@ class Dummy(object): except AttributeError: pass + def _get_dict(self): + data = {} + for k in self.__kw.keys(): + data[k] = getattr(self, k) + data.pop('event') + data.pop('_kw') + data.pop('manager') + return data + def __repr__(self): args = [] for k in self.__kw.keys(): @@ -69,6 +78,36 @@ class Dummy(object): def set(self, key, value): setattr(self, key, value) + def copy(self): + data = self._get_dict() + return Dummy(self.__kind, **data) + + +class FakeResponse: + def __init__(self, data): + self._data = data + self.links = [] + + def json(self): + return self._data + + +class FakeSession: + def __init__(self, cloud): + self.cloud = cloud + + def get(self, uri, headers, params): + if uri == '/servers/detail': + server_list = [] + for server in self.cloud._server_list: + data = server._get_dict() + data['hostId'] = data.pop('host_id') + data['OS-EXT-AZ:availability_zone'] = data.pop('location').zone + data['os-extended-volumes:volumes_attached'] =\ + data.pop('volumes') + server_list.append(data) + return FakeResponse({'servers': server_list}) + class FakeOpenStackCloud(object): log = logging.getLogger("nodepool.FakeOpenStackCloud") @@ -82,6 +121,7 @@ class FakeOpenStackCloud(object): return 100, 1000000 def __init__(self, images=None, networks=None): + self.compute = FakeSession(self) self.pause_creates = False self._image_list = images if self._image_list is None: @@ -225,7 +265,7 @@ class FakeOpenStackCloud(object): name='FakeProvider create', args=(s, 0.1, done_status)) t.start() - return s + return s.copy() def _delete(self, name_or_id, instance_list): self.log.debug("Delete from %s" % (repr(instance_list),)) @@ -388,8 +428,9 @@ class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud): *args, **kwargs, done_status='ERROR') # Don't wait for the async update - server.status = 'ERROR' - server.fault = {'message': 'quota server fault'} + orig_server = self._get(server.id, self._server_list) + orig_server.status = 'ERROR' + orig_server.fault = {'message': 'quota server fault'} raise OpenStackCloudCreateException('server', server.id) diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py index e83609b2b..dbb287744 100644 --- a/nodepool/driver/openstack/adapter.py +++ b/nodepool/driver/openstack/adapter.py @@ -17,11 +17,13 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor +from collections import UserDict import functools import logging import math import time import operator +import urllib import openstack from keystoneauth1.exceptions.catalog import EndpointNotFound @@ -66,21 +68,68 @@ def quota_from_limits(compute, volume): return QuotaInformation(**args) +class NodepoolOpenStackServer(UserDict): + # Most of OpenStackSDK is designed around a dictionary interface, + # but due to the historic use of the Munch object, some code + # (add_server_interfaces) accesses values by attribute instead of + # key. For just those values, we provide getters and setters. + + @property + def access_ipv4(self): + return self.data.get('access_ipv4') + + @access_ipv4.setter + def access_ipv4(self, value): + self.data['access_ipv4'] = value + + @property + def public_v4(self): + return self.data.get('public_v4') + + @public_v4.setter + def public_v4(self, value): + self.data['public_v4'] = value + + @property + def private_v4(self): + return self.data.get('private_v4') + + @private_v4.setter + def private_v4(self, value): + self.data['private_v4'] = value + + @property + def access_ipv6(self): + return self.data.get('access_ipv6') + + @access_ipv6.setter + def access_ipv6(self, value): + self.data['access_ipv6'] = value + + @property + def public_v6(self): + return self.data.get('public_v6') + + @public_v6.setter + def public_v6(self, value): + self.data['public_v6'] = value + + class OpenStackInstance(statemachine.Instance): def __init__(self, provider, server, quota): super().__init__() - self.external_id = server.id - self.metadata = server.metadata - self.private_ipv4 = server.private_v4 + self.external_id = server['id'] + self.metadata = server.get('metadata', {}) + self.private_ipv4 = server.get('private_v4') self.private_ipv6 = None - self.public_ipv4 = server.public_v4 - self.public_ipv6 = server.public_v6 - self.host_id = server.host_id + self.public_ipv4 = server.get('public_v4') + self.public_ipv6 = server.get('public_v6') + self.host_id = server['hostId'] self.cloud = provider.cloud_config.name self.region = provider.region_name - self.az = server.location.zone + self.az = server.get('OS-EXT-AZ:availability_zone') - self.interface_ip = server.interface_ip + self.interface_ip = server.get('interface_ip') # TODO: this doesn't match the behavior of other drivers # but is here for backwards compatibility. self.private_ipv4 = self.private_ipv4 or self.public_ipv4 @@ -118,7 +167,7 @@ class OpenStackDeleteStateMachine(statemachine.StateMachine): self.server = self.adapter._getServer(self.external_id) if (self.server and self.adapter._hasFloatingIps() and - self.server.addresses): + self.server.get('addresses')): self.floating_ips = self.adapter._getFloatingIps(self.server) for fip in self.floating_ips: self.adapter._deleteFloatingIp(fip) @@ -274,7 +323,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): self.server = self.adapter._completeApi(self.create_future) if self.server is None: return - self.external_id = self.server.id + self.external_id = self.server['id'] self.state = self.SERVER_CREATING except openstack.cloud.exc.OpenStackCloudCreateException as e: if e.resource_id: @@ -295,7 +344,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): if self.state == self.SERVER_CREATING: self.server = self.adapter._refreshServer(self.server) - if self.server.status == 'ACTIVE': + if self.server['status'] == 'ACTIVE': if (self.label.pool.auto_floating_ip and self.adapter._needsFloatingIp(self.server)): self.floating_ip = self.adapter._createFloatingIp( @@ -303,7 +352,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine): self.state = self.FLOATING_IP_CREATING else: self.state = self.COMPLETE - elif self.server.status == 'ERROR': + elif self.server['status'] == 'ERROR': if ('fault' in self.server and self.server['fault'] is not None and 'message' in self.server['fault']): self.log.error( @@ -404,10 +453,11 @@ class OpenStackAdapter(statemachine.Adapter): def listResources(self): for server in self._listServers(): - if server.status.lower() == 'deleted': + if server['status'].lower() == 'deleted': continue - yield OpenStackResource(server.metadata, - OpenStackResource.TYPE_INSTANCE, server.id) + yield OpenStackResource(server.get('metadata', {}), + OpenStackResource.TYPE_INSTANCE, + server['id']) # Floating IP and port leakage can't be handled by the # automatic resource cleanup in cleanupLeakedResources because # openstack doesn't store metadata on those objects, so we @@ -427,12 +477,13 @@ class OpenStackAdapter(statemachine.Adapter): for volume in self._listVolumes(): volumes[volume.id] = volume for server in self._listServers(): - if server.status.lower() == 'deleted': + if server['status'].lower() == 'deleted': continue flavor = self._getFlavorFromServer(server) server_volumes = [] - for vattach in server.volumes: - volume = volumes.get(vattach.id) + for vattach in server.get( + 'os-extended-volumes:volumes_attached', []): + volume = volumes.get(vattach['id']) if volume: server_volumes.append(volume) quota = quota_from_flavor(flavor, volumes=server_volumes) @@ -664,9 +715,94 @@ class OpenStackAdapter(statemachine.Adapter): name, self.provider.name)) return network + # This method is based on code from OpenStackSDK, licensed + # under ASL2. + def _simpleServerList(self): + session = self._client.compute + limit = None + query_params = {} + uri = '/servers/detail' + ret = [] + while uri: + response = session.get( + uri, + headers={"Accept": "application/json"}, + params=query_params, + ) + data = response.json() + + last_marker = query_params.pop('marker', None) + query_params.pop('limit', None) + + resources = data['servers'] + if not isinstance(resources, list): + resources = [resources] + + ret += [NodepoolOpenStackServer(x) for x in resources] + + if resources: + marker = resources[-1]['id'] + uri, next_params = self._getNextLink( + uri, response, data, marker, limit + ) + try: + if next_params['marker'] == last_marker: + raise Exception( + 'Endless pagination loop detected, aborting' + ) + except KeyError: + pass + query_params.update(next_params) + else: + break + return ret + + # This method is based on code from OpenStackSDK, licensed + # under ASL2. + def _getNextLink(self, uri, response, data, marker, limit): + pagination_key = 'servers_links' + next_link = None + params = {} + + if isinstance(data, dict): + links = data.get(pagination_key, {}) + + for item in links: + if item.get('rel') == 'next' and 'href' in item: + next_link = item['href'] + break + + if next_link and next_link.startswith('/v'): + next_link = next_link[next_link.find('/', 1):] + + if not next_link and 'next' in response.links: + # RFC5988 specifies Link headers and requests parses them if they + # are there. We prefer link dicts in resource body, but if those + # aren't there and Link headers are, use them. + next_link = response.links['next']['uri'] + + # Parse params from Link (next page URL) into params. + # This prevents duplication of query parameters that with large + # number of pages result in HTTP 414 error eventually. + if next_link: + parts = urllib.parse.urlparse(next_link) + query_params = urllib.parse.parse_qs(parts.query) + params.update(query_params) + next_link = urllib.parse.urljoin(next_link, parts.path) + + # If we still have no link, and limit was given and is non-zero, + # and the number of records yielded equals the limit, then the user + # is playing pagination ball so we should go ahead and try once more. + if not next_link and limit: + next_link = uri + params['marker'] = marker + params['limit'] = limit + + return next_link, params + def _listServers(self): - with Timer(self.log, 'API call list_servers'): - return self._client.list_servers(bare=True) + with Timer(self.log, 'API call detailed server list'): + return self._simpleServerList() def _listVolumes(self): try: @@ -680,7 +816,7 @@ class OpenStackAdapter(statemachine.Adapter): return self._client.list_floating_ips() def _refreshServer(self, obj): - ret = self._getServer(obj.id) + ret = self._getServer(obj['id']) if ret: return ret return obj @@ -691,8 +827,8 @@ class OpenStackAdapter(statemachine.Adapter): def _getServer(self, external_id): for server in self._listServers(): - if server.id == external_id: - if server.status in ['ACTIVE', 'ERROR']: + if server['id'] == external_id: + if server['status'] in ['ACTIVE', 'ERROR']: return self._expandServer(server) return server return None @@ -707,15 +843,15 @@ class OpenStackAdapter(statemachine.Adapter): if obj is None: return obj for server in self._listServers(): - if server.id == obj.id: - if server.status.lower() == 'deleted': + if server['id'] == obj['id']: + if server['status'].lower() == 'deleted': return None return server return None def _refreshFloatingIp(self, obj): for fip in self._listFloatingIps(): - if fip.id == obj.id: + if fip['id'] == obj['id']: return fip return obj @@ -723,7 +859,7 @@ class OpenStackAdapter(statemachine.Adapter): if obj is None: return obj for fip in self._listFloatingIps(): - if fip.id == obj.id: + if fip['id'] == obj['id']: if fip.status == 'DOWN': return None return fip @@ -774,10 +910,10 @@ class OpenStackAdapter(statemachine.Adapter): # In later versions it returns the information we're looking for. # If we get the information we want, we do not need to try to # lookup the flavor in our list. - if hasattr(server.flavor, 'vcpus'): - return server.flavor + if hasattr(server['flavor'], 'vcpus'): + return server['flavor'] else: - return self._findFlavorById(server.flavor.id) + return self._findFlavorById(server['flavor']['id']) # The port cleanup logic. We don't get tags or metadata, so we # have to figure this out on our own. diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 5f6586f37..97540ff24 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -592,10 +592,12 @@ class DBTestCase(BaseTestCase): interval=1): if hasattr(manager, 'adapter'): servers = manager.adapter._listServers() + server_ids = [s['id'] for s in servers] else: # TODO: remove once all drivers use statemachine servers = manager.listNodes() - if not (instance_id in [s.id for s in servers]): + server_ids = [s.id for s in servers] + if not (instance_id in server_ids): break def waitForNodeRequestLockDeletion(self, request_id): diff --git a/nodepool/tests/fixtures/node_max_hold_age_2.yaml b/nodepool/tests/fixtures/node_max_hold_age_2.yaml index 7deb8ad18..beb4e2a3b 100644 --- a/nodepool/tests/fixtures/node_max_hold_age_2.yaml +++ b/nodepool/tests/fixtures/node_max_hold_age_2.yaml @@ -2,7 +2,7 @@ elements-dir: . images-dir: '{images_dir}' build-log-dir: '{build_log_dir}' -max-hold-age: 10 +max-hold-age: 20 zookeeper-servers: - host: {zookeeper_host} diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index e1345161e..dac293815 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -100,11 +100,12 @@ class TestLauncher(tests.DBTestCase): provider = pool.getProviderManager('fake-provider') cloud_node = provider.adapter._getServer(node.external_id) self.assertEqual( - cloud_node.metadata['nodepool_provider_name'], + cloud_node['metadata']['nodepool_provider_name'], 'fake-provider') - self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main') - self.assertEqual(cloud_node.metadata['prop1'], 'foo') - self.assertEqual(cloud_node.metadata['dynamic-tenant'], + self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], + 'main') + self.assertEqual(cloud_node['metadata']['prop1'], 'foo') + self.assertEqual(cloud_node['metadata']['dynamic-tenant'], 'Tenant is tenant-1') self.zk.lockNode(node, blocking=False) @@ -736,11 +737,11 @@ class TestLauncher(tests.DBTestCase): provider = pool.getProviderManager('fake-provider') cloud_node = provider.adapter._getServer(nodes[0].external_id) self.assertEqual( - cloud_node.metadata['nodepool_provider_name'], + cloud_node['metadata']['nodepool_provider_name'], 'fake-provider') - self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main') - self.assertEqual(cloud_node.metadata['prop1'], 'foo') - self.assertEqual(cloud_node.metadata['dynamic-tenant'], + self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], 'main') + self.assertEqual(cloud_node['metadata']['prop1'], 'foo') + self.assertEqual(cloud_node['metadata']['dynamic-tenant'], 'Tenant is None') def test_node_network_cli(self): @@ -1110,9 +1111,9 @@ class TestLauncher(tests.DBTestCase): # Get fake cloud record and set status to DELETING manager = pool.getProviderManager('fake-provider') - for instance in manager.adapter._listServers(): + for instance in manager.adapter._client._server_list: if instance.id == nodes[0].external_id: - instance.status = 'DELETED' + instance['status'] = 'DELETED' break nodes[0].state = zk.DELETING @@ -1123,7 +1124,7 @@ class TestLauncher(tests.DBTestCase): api_record_remains = False for instance in manager.adapter._listServers(): - if instance.id == nodes[0].external_id: + if instance['id'] == nodes[0].external_id: api_record_remains = True break self.assertTrue(api_record_remains)