Use low-level OpenStack SDK calls for server listing

The OpenStack SDK performs a lot of processing on the JSON data
returned by nova, and on large server lists, this can dwarf the
actual time needed to receive and parse the JSON.

Nodepool uses very little of this information, so let's use the
keystoneauth session to get a simple JSON list.

The Server object that SDK normally returns is a hybrid object
that provides both attributes and dictionary keys.  One method
that we call has some lingering references to accessors, so we
create a UserDict subclass to handle those. Nodepool-internal
references are updated from attributes to dictionary keys.

Change-Id: Iecc5976858e8d2ee6894a521f6a30f10ae9c6177
This commit is contained in:
James E. Blair 2023-07-05 15:17:04 -07:00
parent 128f7bf237
commit 066699f88a
5 changed files with 226 additions and 46 deletions

View File

@ -47,6 +47,15 @@ class Dummy(object):
except AttributeError:
pass
def _get_dict(self):
data = {}
for k in self.__kw.keys():
data[k] = getattr(self, k)
data.pop('event')
data.pop('_kw')
data.pop('manager')
return data
def __repr__(self):
args = []
for k in self.__kw.keys():
@ -69,6 +78,36 @@ class Dummy(object):
def set(self, key, value):
setattr(self, key, value)
def copy(self):
data = self._get_dict()
return Dummy(self.__kind, **data)
class FakeResponse:
def __init__(self, data):
self._data = data
self.links = []
def json(self):
return self._data
class FakeSession:
def __init__(self, cloud):
self.cloud = cloud
def get(self, uri, headers, params):
if uri == '/servers/detail':
server_list = []
for server in self.cloud._server_list:
data = server._get_dict()
data['hostId'] = data.pop('host_id')
data['OS-EXT-AZ:availability_zone'] = data.pop('location').zone
data['os-extended-volumes:volumes_attached'] =\
data.pop('volumes')
server_list.append(data)
return FakeResponse({'servers': server_list})
class FakeOpenStackCloud(object):
log = logging.getLogger("nodepool.FakeOpenStackCloud")
@ -82,6 +121,7 @@ class FakeOpenStackCloud(object):
return 100, 1000000
def __init__(self, images=None, networks=None):
self.compute = FakeSession(self)
self.pause_creates = False
self._image_list = images
if self._image_list is None:
@ -225,7 +265,7 @@ class FakeOpenStackCloud(object):
name='FakeProvider create',
args=(s, 0.1, done_status))
t.start()
return s
return s.copy()
def _delete(self, name_or_id, instance_list):
self.log.debug("Delete from %s" % (repr(instance_list),))
@ -388,8 +428,9 @@ class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud):
*args, **kwargs,
done_status='ERROR')
# Don't wait for the async update
server.status = 'ERROR'
server.fault = {'message': 'quota server fault'}
orig_server = self._get(server.id, self._server_list)
orig_server.status = 'ERROR'
orig_server.fault = {'message': 'quota server fault'}
raise OpenStackCloudCreateException('server', server.id)

View File

@ -17,11 +17,13 @@
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
from collections import UserDict
import functools
import logging
import math
import time
import operator
import urllib
import openstack
from keystoneauth1.exceptions.catalog import EndpointNotFound
@ -66,21 +68,68 @@ def quota_from_limits(compute, volume):
return QuotaInformation(**args)
class NodepoolOpenStackServer(UserDict):
# Most of OpenStackSDK is designed around a dictionary interface,
# but due to the historic use of the Munch object, some code
# (add_server_interfaces) accesses values by attribute instead of
# key. For just those values, we provide getters and setters.
@property
def access_ipv4(self):
return self.data.get('access_ipv4')
@access_ipv4.setter
def access_ipv4(self, value):
self.data['access_ipv4'] = value
@property
def public_v4(self):
return self.data.get('public_v4')
@public_v4.setter
def public_v4(self, value):
self.data['public_v4'] = value
@property
def private_v4(self):
return self.data.get('private_v4')
@private_v4.setter
def private_v4(self, value):
self.data['private_v4'] = value
@property
def access_ipv6(self):
return self.data.get('access_ipv6')
@access_ipv6.setter
def access_ipv6(self, value):
self.data['access_ipv6'] = value
@property
def public_v6(self):
return self.data.get('public_v6')
@public_v6.setter
def public_v6(self, value):
self.data['public_v6'] = value
class OpenStackInstance(statemachine.Instance):
def __init__(self, provider, server, quota):
super().__init__()
self.external_id = server.id
self.metadata = server.metadata
self.private_ipv4 = server.private_v4
self.external_id = server['id']
self.metadata = server.get('metadata', {})
self.private_ipv4 = server.get('private_v4')
self.private_ipv6 = None
self.public_ipv4 = server.public_v4
self.public_ipv6 = server.public_v6
self.host_id = server.host_id
self.public_ipv4 = server.get('public_v4')
self.public_ipv6 = server.get('public_v6')
self.host_id = server['hostId']
self.cloud = provider.cloud_config.name
self.region = provider.region_name
self.az = server.location.zone
self.az = server.get('OS-EXT-AZ:availability_zone')
self.interface_ip = server.interface_ip
self.interface_ip = server.get('interface_ip')
# TODO: this doesn't match the behavior of other drivers
# but is here for backwards compatibility.
self.private_ipv4 = self.private_ipv4 or self.public_ipv4
@ -118,7 +167,7 @@ class OpenStackDeleteStateMachine(statemachine.StateMachine):
self.server = self.adapter._getServer(self.external_id)
if (self.server and
self.adapter._hasFloatingIps() and
self.server.addresses):
self.server.get('addresses')):
self.floating_ips = self.adapter._getFloatingIps(self.server)
for fip in self.floating_ips:
self.adapter._deleteFloatingIp(fip)
@ -274,7 +323,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
self.server = self.adapter._completeApi(self.create_future)
if self.server is None:
return
self.external_id = self.server.id
self.external_id = self.server['id']
self.state = self.SERVER_CREATING
except openstack.cloud.exc.OpenStackCloudCreateException as e:
if e.resource_id:
@ -295,7 +344,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
if self.state == self.SERVER_CREATING:
self.server = self.adapter._refreshServer(self.server)
if self.server.status == 'ACTIVE':
if self.server['status'] == 'ACTIVE':
if (self.label.pool.auto_floating_ip and
self.adapter._needsFloatingIp(self.server)):
self.floating_ip = self.adapter._createFloatingIp(
@ -303,7 +352,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
self.state = self.FLOATING_IP_CREATING
else:
self.state = self.COMPLETE
elif self.server.status == 'ERROR':
elif self.server['status'] == 'ERROR':
if ('fault' in self.server and self.server['fault'] is not None
and 'message' in self.server['fault']):
self.log.error(
@ -404,10 +453,11 @@ class OpenStackAdapter(statemachine.Adapter):
def listResources(self):
for server in self._listServers():
if server.status.lower() == 'deleted':
if server['status'].lower() == 'deleted':
continue
yield OpenStackResource(server.metadata,
OpenStackResource.TYPE_INSTANCE, server.id)
yield OpenStackResource(server.get('metadata', {}),
OpenStackResource.TYPE_INSTANCE,
server['id'])
# Floating IP and port leakage can't be handled by the
# automatic resource cleanup in cleanupLeakedResources because
# openstack doesn't store metadata on those objects, so we
@ -427,12 +477,13 @@ class OpenStackAdapter(statemachine.Adapter):
for volume in self._listVolumes():
volumes[volume.id] = volume
for server in self._listServers():
if server.status.lower() == 'deleted':
if server['status'].lower() == 'deleted':
continue
flavor = self._getFlavorFromServer(server)
server_volumes = []
for vattach in server.volumes:
volume = volumes.get(vattach.id)
for vattach in server.get(
'os-extended-volumes:volumes_attached', []):
volume = volumes.get(vattach['id'])
if volume:
server_volumes.append(volume)
quota = quota_from_flavor(flavor, volumes=server_volumes)
@ -664,9 +715,94 @@ class OpenStackAdapter(statemachine.Adapter):
name, self.provider.name))
return network
# This method is based on code from OpenStackSDK, licensed
# under ASL2.
def _simpleServerList(self):
session = self._client.compute
limit = None
query_params = {}
uri = '/servers/detail'
ret = []
while uri:
response = session.get(
uri,
headers={"Accept": "application/json"},
params=query_params,
)
data = response.json()
last_marker = query_params.pop('marker', None)
query_params.pop('limit', None)
resources = data['servers']
if not isinstance(resources, list):
resources = [resources]
ret += [NodepoolOpenStackServer(x) for x in resources]
if resources:
marker = resources[-1]['id']
uri, next_params = self._getNextLink(
uri, response, data, marker, limit
)
try:
if next_params['marker'] == last_marker:
raise Exception(
'Endless pagination loop detected, aborting'
)
except KeyError:
pass
query_params.update(next_params)
else:
break
return ret
# This method is based on code from OpenStackSDK, licensed
# under ASL2.
def _getNextLink(self, uri, response, data, marker, limit):
pagination_key = 'servers_links'
next_link = None
params = {}
if isinstance(data, dict):
links = data.get(pagination_key, {})
for item in links:
if item.get('rel') == 'next' and 'href' in item:
next_link = item['href']
break
if next_link and next_link.startswith('/v'):
next_link = next_link[next_link.find('/', 1):]
if not next_link and 'next' in response.links:
# RFC5988 specifies Link headers and requests parses them if they
# are there. We prefer link dicts in resource body, but if those
# aren't there and Link headers are, use them.
next_link = response.links['next']['uri']
# Parse params from Link (next page URL) into params.
# This prevents duplication of query parameters that with large
# number of pages result in HTTP 414 error eventually.
if next_link:
parts = urllib.parse.urlparse(next_link)
query_params = urllib.parse.parse_qs(parts.query)
params.update(query_params)
next_link = urllib.parse.urljoin(next_link, parts.path)
# If we still have no link, and limit was given and is non-zero,
# and the number of records yielded equals the limit, then the user
# is playing pagination ball so we should go ahead and try once more.
if not next_link and limit:
next_link = uri
params['marker'] = marker
params['limit'] = limit
return next_link, params
def _listServers(self):
with Timer(self.log, 'API call list_servers'):
return self._client.list_servers(bare=True)
with Timer(self.log, 'API call detailed server list'):
return self._simpleServerList()
def _listVolumes(self):
try:
@ -680,7 +816,7 @@ class OpenStackAdapter(statemachine.Adapter):
return self._client.list_floating_ips()
def _refreshServer(self, obj):
ret = self._getServer(obj.id)
ret = self._getServer(obj['id'])
if ret:
return ret
return obj
@ -691,8 +827,8 @@ class OpenStackAdapter(statemachine.Adapter):
def _getServer(self, external_id):
for server in self._listServers():
if server.id == external_id:
if server.status in ['ACTIVE', 'ERROR']:
if server['id'] == external_id:
if server['status'] in ['ACTIVE', 'ERROR']:
return self._expandServer(server)
return server
return None
@ -707,15 +843,15 @@ class OpenStackAdapter(statemachine.Adapter):
if obj is None:
return obj
for server in self._listServers():
if server.id == obj.id:
if server.status.lower() == 'deleted':
if server['id'] == obj['id']:
if server['status'].lower() == 'deleted':
return None
return server
return None
def _refreshFloatingIp(self, obj):
for fip in self._listFloatingIps():
if fip.id == obj.id:
if fip['id'] == obj['id']:
return fip
return obj
@ -723,7 +859,7 @@ class OpenStackAdapter(statemachine.Adapter):
if obj is None:
return obj
for fip in self._listFloatingIps():
if fip.id == obj.id:
if fip['id'] == obj['id']:
if fip.status == 'DOWN':
return None
return fip
@ -774,10 +910,10 @@ class OpenStackAdapter(statemachine.Adapter):
# In later versions it returns the information we're looking for.
# If we get the information we want, we do not need to try to
# lookup the flavor in our list.
if hasattr(server.flavor, 'vcpus'):
return server.flavor
if hasattr(server['flavor'], 'vcpus'):
return server['flavor']
else:
return self._findFlavorById(server.flavor.id)
return self._findFlavorById(server['flavor']['id'])
# The port cleanup logic. We don't get tags or metadata, so we
# have to figure this out on our own.

View File

@ -592,10 +592,12 @@ class DBTestCase(BaseTestCase):
interval=1):
if hasattr(manager, 'adapter'):
servers = manager.adapter._listServers()
server_ids = [s['id'] for s in servers]
else:
# TODO: remove once all drivers use statemachine
servers = manager.listNodes()
if not (instance_id in [s.id for s in servers]):
server_ids = [s.id for s in servers]
if not (instance_id in server_ids):
break
def waitForNodeRequestLockDeletion(self, request_id):

View File

@ -2,7 +2,7 @@ elements-dir: .
images-dir: '{images_dir}'
build-log-dir: '{build_log_dir}'
max-hold-age: 10
max-hold-age: 20
zookeeper-servers:
- host: {zookeeper_host}

View File

@ -100,11 +100,12 @@ class TestLauncher(tests.DBTestCase):
provider = pool.getProviderManager('fake-provider')
cloud_node = provider.adapter._getServer(node.external_id)
self.assertEqual(
cloud_node.metadata['nodepool_provider_name'],
cloud_node['metadata']['nodepool_provider_name'],
'fake-provider')
self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main')
self.assertEqual(cloud_node.metadata['prop1'], 'foo')
self.assertEqual(cloud_node.metadata['dynamic-tenant'],
self.assertEqual(cloud_node['metadata']['nodepool_pool_name'],
'main')
self.assertEqual(cloud_node['metadata']['prop1'], 'foo')
self.assertEqual(cloud_node['metadata']['dynamic-tenant'],
'Tenant is tenant-1')
self.zk.lockNode(node, blocking=False)
@ -736,11 +737,11 @@ class TestLauncher(tests.DBTestCase):
provider = pool.getProviderManager('fake-provider')
cloud_node = provider.adapter._getServer(nodes[0].external_id)
self.assertEqual(
cloud_node.metadata['nodepool_provider_name'],
cloud_node['metadata']['nodepool_provider_name'],
'fake-provider')
self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main')
self.assertEqual(cloud_node.metadata['prop1'], 'foo')
self.assertEqual(cloud_node.metadata['dynamic-tenant'],
self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], 'main')
self.assertEqual(cloud_node['metadata']['prop1'], 'foo')
self.assertEqual(cloud_node['metadata']['dynamic-tenant'],
'Tenant is None')
def test_node_network_cli(self):
@ -1110,9 +1111,9 @@ class TestLauncher(tests.DBTestCase):
# Get fake cloud record and set status to DELETING
manager = pool.getProviderManager('fake-provider')
for instance in manager.adapter._listServers():
for instance in manager.adapter._client._server_list:
if instance.id == nodes[0].external_id:
instance.status = 'DELETED'
instance['status'] = 'DELETED'
break
nodes[0].state = zk.DELETING
@ -1123,7 +1124,7 @@ class TestLauncher(tests.DBTestCase):
api_record_remains = False
for instance in manager.adapter._listServers():
if instance.id == nodes[0].external_id:
if instance['id'] == nodes[0].external_id:
api_record_remains = True
break
self.assertTrue(api_record_remains)