Merge "Use low-level OpenStack SDK calls for server listing"
This commit is contained in:
commit
cf271480be
@ -47,6 +47,15 @@ class Dummy(object):
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def _get_dict(self):
|
||||
data = {}
|
||||
for k in self.__kw.keys():
|
||||
data[k] = getattr(self, k)
|
||||
data.pop('event')
|
||||
data.pop('_kw')
|
||||
data.pop('manager')
|
||||
return data
|
||||
|
||||
def __repr__(self):
|
||||
args = []
|
||||
for k in self.__kw.keys():
|
||||
@ -69,6 +78,36 @@ class Dummy(object):
|
||||
def set(self, key, value):
|
||||
setattr(self, key, value)
|
||||
|
||||
def copy(self):
|
||||
data = self._get_dict()
|
||||
return Dummy(self.__kind, **data)
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, data):
|
||||
self._data = data
|
||||
self.links = []
|
||||
|
||||
def json(self):
|
||||
return self._data
|
||||
|
||||
|
||||
class FakeSession:
|
||||
def __init__(self, cloud):
|
||||
self.cloud = cloud
|
||||
|
||||
def get(self, uri, headers, params):
|
||||
if uri == '/servers/detail':
|
||||
server_list = []
|
||||
for server in self.cloud._server_list:
|
||||
data = server._get_dict()
|
||||
data['hostId'] = data.pop('host_id')
|
||||
data['OS-EXT-AZ:availability_zone'] = data.pop('location').zone
|
||||
data['os-extended-volumes:volumes_attached'] =\
|
||||
data.pop('volumes')
|
||||
server_list.append(data)
|
||||
return FakeResponse({'servers': server_list})
|
||||
|
||||
|
||||
class FakeOpenStackCloud(object):
|
||||
log = logging.getLogger("nodepool.FakeOpenStackCloud")
|
||||
@ -82,6 +121,7 @@ class FakeOpenStackCloud(object):
|
||||
return 100, 1000000
|
||||
|
||||
def __init__(self, images=None, networks=None):
|
||||
self.compute = FakeSession(self)
|
||||
self.pause_creates = False
|
||||
self._image_list = images
|
||||
if self._image_list is None:
|
||||
@ -225,7 +265,7 @@ class FakeOpenStackCloud(object):
|
||||
name='FakeProvider create',
|
||||
args=(s, 0.1, done_status))
|
||||
t.start()
|
||||
return s
|
||||
return s.copy()
|
||||
|
||||
def _delete(self, name_or_id, instance_list):
|
||||
self.log.debug("Delete from %s" % (repr(instance_list),))
|
||||
@ -388,8 +428,9 @@ class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud):
|
||||
*args, **kwargs,
|
||||
done_status='ERROR')
|
||||
# Don't wait for the async update
|
||||
server.status = 'ERROR'
|
||||
server.fault = {'message': 'quota server fault'}
|
||||
orig_server = self._get(server.id, self._server_list)
|
||||
orig_server.status = 'ERROR'
|
||||
orig_server.fault = {'message': 'quota server fault'}
|
||||
raise OpenStackCloudCreateException('server', server.id)
|
||||
|
||||
|
||||
|
@ -17,11 +17,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from collections import UserDict
|
||||
import functools
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
import operator
|
||||
import urllib
|
||||
|
||||
import openstack
|
||||
from keystoneauth1.exceptions.catalog import EndpointNotFound
|
||||
@ -66,21 +68,68 @@ def quota_from_limits(compute, volume):
|
||||
return QuotaInformation(**args)
|
||||
|
||||
|
||||
class NodepoolOpenStackServer(UserDict):
|
||||
# Most of OpenStackSDK is designed around a dictionary interface,
|
||||
# but due to the historic use of the Munch object, some code
|
||||
# (add_server_interfaces) accesses values by attribute instead of
|
||||
# key. For just those values, we provide getters and setters.
|
||||
|
||||
@property
|
||||
def access_ipv4(self):
|
||||
return self.data.get('access_ipv4')
|
||||
|
||||
@access_ipv4.setter
|
||||
def access_ipv4(self, value):
|
||||
self.data['access_ipv4'] = value
|
||||
|
||||
@property
|
||||
def public_v4(self):
|
||||
return self.data.get('public_v4')
|
||||
|
||||
@public_v4.setter
|
||||
def public_v4(self, value):
|
||||
self.data['public_v4'] = value
|
||||
|
||||
@property
|
||||
def private_v4(self):
|
||||
return self.data.get('private_v4')
|
||||
|
||||
@private_v4.setter
|
||||
def private_v4(self, value):
|
||||
self.data['private_v4'] = value
|
||||
|
||||
@property
|
||||
def access_ipv6(self):
|
||||
return self.data.get('access_ipv6')
|
||||
|
||||
@access_ipv6.setter
|
||||
def access_ipv6(self, value):
|
||||
self.data['access_ipv6'] = value
|
||||
|
||||
@property
|
||||
def public_v6(self):
|
||||
return self.data.get('public_v6')
|
||||
|
||||
@public_v6.setter
|
||||
def public_v6(self, value):
|
||||
self.data['public_v6'] = value
|
||||
|
||||
|
||||
class OpenStackInstance(statemachine.Instance):
|
||||
def __init__(self, provider, server, quota):
|
||||
super().__init__()
|
||||
self.external_id = server.id
|
||||
self.metadata = server.metadata
|
||||
self.private_ipv4 = server.private_v4
|
||||
self.external_id = server['id']
|
||||
self.metadata = server.get('metadata', {})
|
||||
self.private_ipv4 = server.get('private_v4')
|
||||
self.private_ipv6 = None
|
||||
self.public_ipv4 = server.public_v4
|
||||
self.public_ipv6 = server.public_v6
|
||||
self.host_id = server.host_id
|
||||
self.public_ipv4 = server.get('public_v4')
|
||||
self.public_ipv6 = server.get('public_v6')
|
||||
self.host_id = server['hostId']
|
||||
self.cloud = provider.cloud_config.name
|
||||
self.region = provider.region_name
|
||||
self.az = server.location.zone
|
||||
self.az = server.get('OS-EXT-AZ:availability_zone')
|
||||
|
||||
self.interface_ip = server.interface_ip
|
||||
self.interface_ip = server.get('interface_ip')
|
||||
# TODO: this doesn't match the behavior of other drivers
|
||||
# but is here for backwards compatibility.
|
||||
self.private_ipv4 = self.private_ipv4 or self.public_ipv4
|
||||
@ -118,7 +167,7 @@ class OpenStackDeleteStateMachine(statemachine.StateMachine):
|
||||
self.server = self.adapter._getServer(self.external_id)
|
||||
if (self.server and
|
||||
self.adapter._hasFloatingIps() and
|
||||
self.server.addresses):
|
||||
self.server.get('addresses')):
|
||||
self.floating_ips = self.adapter._getFloatingIps(self.server)
|
||||
for fip in self.floating_ips:
|
||||
self.adapter._deleteFloatingIp(fip)
|
||||
@ -274,7 +323,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
|
||||
self.server = self.adapter._completeApi(self.create_future)
|
||||
if self.server is None:
|
||||
return
|
||||
self.external_id = self.server.id
|
||||
self.external_id = self.server['id']
|
||||
self.state = self.SERVER_CREATING
|
||||
except openstack.cloud.exc.OpenStackCloudCreateException as e:
|
||||
if e.resource_id:
|
||||
@ -295,7 +344,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
|
||||
if self.state == self.SERVER_CREATING:
|
||||
self.server = self.adapter._refreshServer(self.server)
|
||||
|
||||
if self.server.status == 'ACTIVE':
|
||||
if self.server['status'] == 'ACTIVE':
|
||||
if (self.label.pool.auto_floating_ip and
|
||||
self.adapter._needsFloatingIp(self.server)):
|
||||
self.floating_ip = self.adapter._createFloatingIp(
|
||||
@ -303,7 +352,7 @@ class OpenStackCreateStateMachine(statemachine.StateMachine):
|
||||
self.state = self.FLOATING_IP_CREATING
|
||||
else:
|
||||
self.state = self.COMPLETE
|
||||
elif self.server.status == 'ERROR':
|
||||
elif self.server['status'] == 'ERROR':
|
||||
if ('fault' in self.server and self.server['fault'] is not None
|
||||
and 'message' in self.server['fault']):
|
||||
self.log.error(
|
||||
@ -404,10 +453,11 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
|
||||
def listResources(self):
|
||||
for server in self._listServers():
|
||||
if server.status.lower() == 'deleted':
|
||||
if server['status'].lower() == 'deleted':
|
||||
continue
|
||||
yield OpenStackResource(server.metadata,
|
||||
OpenStackResource.TYPE_INSTANCE, server.id)
|
||||
yield OpenStackResource(server.get('metadata', {}),
|
||||
OpenStackResource.TYPE_INSTANCE,
|
||||
server['id'])
|
||||
# Floating IP and port leakage can't be handled by the
|
||||
# automatic resource cleanup in cleanupLeakedResources because
|
||||
# openstack doesn't store metadata on those objects, so we
|
||||
@ -427,12 +477,13 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
for volume in self._listVolumes():
|
||||
volumes[volume.id] = volume
|
||||
for server in self._listServers():
|
||||
if server.status.lower() == 'deleted':
|
||||
if server['status'].lower() == 'deleted':
|
||||
continue
|
||||
flavor = self._getFlavorFromServer(server)
|
||||
server_volumes = []
|
||||
for vattach in server.volumes:
|
||||
volume = volumes.get(vattach.id)
|
||||
for vattach in server.get(
|
||||
'os-extended-volumes:volumes_attached', []):
|
||||
volume = volumes.get(vattach['id'])
|
||||
if volume:
|
||||
server_volumes.append(volume)
|
||||
quota = quota_from_flavor(flavor, volumes=server_volumes)
|
||||
@ -664,9 +715,94 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
name, self.provider.name))
|
||||
return network
|
||||
|
||||
# This method is based on code from OpenStackSDK, licensed
|
||||
# under ASL2.
|
||||
def _simpleServerList(self):
|
||||
session = self._client.compute
|
||||
limit = None
|
||||
query_params = {}
|
||||
uri = '/servers/detail'
|
||||
ret = []
|
||||
while uri:
|
||||
response = session.get(
|
||||
uri,
|
||||
headers={"Accept": "application/json"},
|
||||
params=query_params,
|
||||
)
|
||||
data = response.json()
|
||||
|
||||
last_marker = query_params.pop('marker', None)
|
||||
query_params.pop('limit', None)
|
||||
|
||||
resources = data['servers']
|
||||
if not isinstance(resources, list):
|
||||
resources = [resources]
|
||||
|
||||
ret += [NodepoolOpenStackServer(x) for x in resources]
|
||||
|
||||
if resources:
|
||||
marker = resources[-1]['id']
|
||||
uri, next_params = self._getNextLink(
|
||||
uri, response, data, marker, limit
|
||||
)
|
||||
try:
|
||||
if next_params['marker'] == last_marker:
|
||||
raise Exception(
|
||||
'Endless pagination loop detected, aborting'
|
||||
)
|
||||
except KeyError:
|
||||
pass
|
||||
query_params.update(next_params)
|
||||
else:
|
||||
break
|
||||
return ret
|
||||
|
||||
# This method is based on code from OpenStackSDK, licensed
|
||||
# under ASL2.
|
||||
def _getNextLink(self, uri, response, data, marker, limit):
|
||||
pagination_key = 'servers_links'
|
||||
next_link = None
|
||||
params = {}
|
||||
|
||||
if isinstance(data, dict):
|
||||
links = data.get(pagination_key, {})
|
||||
|
||||
for item in links:
|
||||
if item.get('rel') == 'next' and 'href' in item:
|
||||
next_link = item['href']
|
||||
break
|
||||
|
||||
if next_link and next_link.startswith('/v'):
|
||||
next_link = next_link[next_link.find('/', 1):]
|
||||
|
||||
if not next_link and 'next' in response.links:
|
||||
# RFC5988 specifies Link headers and requests parses them if they
|
||||
# are there. We prefer link dicts in resource body, but if those
|
||||
# aren't there and Link headers are, use them.
|
||||
next_link = response.links['next']['uri']
|
||||
|
||||
# Parse params from Link (next page URL) into params.
|
||||
# This prevents duplication of query parameters that with large
|
||||
# number of pages result in HTTP 414 error eventually.
|
||||
if next_link:
|
||||
parts = urllib.parse.urlparse(next_link)
|
||||
query_params = urllib.parse.parse_qs(parts.query)
|
||||
params.update(query_params)
|
||||
next_link = urllib.parse.urljoin(next_link, parts.path)
|
||||
|
||||
# If we still have no link, and limit was given and is non-zero,
|
||||
# and the number of records yielded equals the limit, then the user
|
||||
# is playing pagination ball so we should go ahead and try once more.
|
||||
if not next_link and limit:
|
||||
next_link = uri
|
||||
params['marker'] = marker
|
||||
params['limit'] = limit
|
||||
|
||||
return next_link, params
|
||||
|
||||
def _listServers(self):
|
||||
with Timer(self.log, 'API call list_servers'):
|
||||
return self._client.list_servers(bare=True)
|
||||
with Timer(self.log, 'API call detailed server list'):
|
||||
return self._simpleServerList()
|
||||
|
||||
def _listVolumes(self):
|
||||
try:
|
||||
@ -680,7 +816,7 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
return self._client.list_floating_ips()
|
||||
|
||||
def _refreshServer(self, obj):
|
||||
ret = self._getServer(obj.id)
|
||||
ret = self._getServer(obj['id'])
|
||||
if ret:
|
||||
return ret
|
||||
return obj
|
||||
@ -691,8 +827,8 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
|
||||
def _getServer(self, external_id):
|
||||
for server in self._listServers():
|
||||
if server.id == external_id:
|
||||
if server.status in ['ACTIVE', 'ERROR']:
|
||||
if server['id'] == external_id:
|
||||
if server['status'] in ['ACTIVE', 'ERROR']:
|
||||
return self._expandServer(server)
|
||||
return server
|
||||
return None
|
||||
@ -707,15 +843,15 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
if obj is None:
|
||||
return obj
|
||||
for server in self._listServers():
|
||||
if server.id == obj.id:
|
||||
if server.status.lower() == 'deleted':
|
||||
if server['id'] == obj['id']:
|
||||
if server['status'].lower() == 'deleted':
|
||||
return None
|
||||
return server
|
||||
return None
|
||||
|
||||
def _refreshFloatingIp(self, obj):
|
||||
for fip in self._listFloatingIps():
|
||||
if fip.id == obj.id:
|
||||
if fip['id'] == obj['id']:
|
||||
return fip
|
||||
return obj
|
||||
|
||||
@ -723,7 +859,7 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
if obj is None:
|
||||
return obj
|
||||
for fip in self._listFloatingIps():
|
||||
if fip.id == obj.id:
|
||||
if fip['id'] == obj['id']:
|
||||
if fip.status == 'DOWN':
|
||||
return None
|
||||
return fip
|
||||
@ -774,10 +910,10 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
# In later versions it returns the information we're looking for.
|
||||
# If we get the information we want, we do not need to try to
|
||||
# lookup the flavor in our list.
|
||||
if hasattr(server.flavor, 'vcpus'):
|
||||
return server.flavor
|
||||
if hasattr(server['flavor'], 'vcpus'):
|
||||
return server['flavor']
|
||||
else:
|
||||
return self._findFlavorById(server.flavor.id)
|
||||
return self._findFlavorById(server['flavor']['id'])
|
||||
|
||||
# The port cleanup logic. We don't get tags or metadata, so we
|
||||
# have to figure this out on our own.
|
||||
|
@ -592,10 +592,12 @@ class DBTestCase(BaseTestCase):
|
||||
interval=1):
|
||||
if hasattr(manager, 'adapter'):
|
||||
servers = manager.adapter._listServers()
|
||||
server_ids = [s['id'] for s in servers]
|
||||
else:
|
||||
# TODO: remove once all drivers use statemachine
|
||||
servers = manager.listNodes()
|
||||
if not (instance_id in [s.id for s in servers]):
|
||||
server_ids = [s.id for s in servers]
|
||||
if not (instance_id in server_ids):
|
||||
break
|
||||
|
||||
def waitForNodeRequestLockDeletion(self, request_id):
|
||||
|
@ -2,7 +2,7 @@ elements-dir: .
|
||||
images-dir: '{images_dir}'
|
||||
build-log-dir: '{build_log_dir}'
|
||||
|
||||
max-hold-age: 10
|
||||
max-hold-age: 20
|
||||
|
||||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
|
@ -100,11 +100,12 @@ class TestLauncher(tests.DBTestCase):
|
||||
provider = pool.getProviderManager('fake-provider')
|
||||
cloud_node = provider.adapter._getServer(node.external_id)
|
||||
self.assertEqual(
|
||||
cloud_node.metadata['nodepool_provider_name'],
|
||||
cloud_node['metadata']['nodepool_provider_name'],
|
||||
'fake-provider')
|
||||
self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main')
|
||||
self.assertEqual(cloud_node.metadata['prop1'], 'foo')
|
||||
self.assertEqual(cloud_node.metadata['dynamic-tenant'],
|
||||
self.assertEqual(cloud_node['metadata']['nodepool_pool_name'],
|
||||
'main')
|
||||
self.assertEqual(cloud_node['metadata']['prop1'], 'foo')
|
||||
self.assertEqual(cloud_node['metadata']['dynamic-tenant'],
|
||||
'Tenant is tenant-1')
|
||||
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
@ -736,11 +737,11 @@ class TestLauncher(tests.DBTestCase):
|
||||
provider = pool.getProviderManager('fake-provider')
|
||||
cloud_node = provider.adapter._getServer(nodes[0].external_id)
|
||||
self.assertEqual(
|
||||
cloud_node.metadata['nodepool_provider_name'],
|
||||
cloud_node['metadata']['nodepool_provider_name'],
|
||||
'fake-provider')
|
||||
self.assertEqual(cloud_node.metadata['nodepool_pool_name'], 'main')
|
||||
self.assertEqual(cloud_node.metadata['prop1'], 'foo')
|
||||
self.assertEqual(cloud_node.metadata['dynamic-tenant'],
|
||||
self.assertEqual(cloud_node['metadata']['nodepool_pool_name'], 'main')
|
||||
self.assertEqual(cloud_node['metadata']['prop1'], 'foo')
|
||||
self.assertEqual(cloud_node['metadata']['dynamic-tenant'],
|
||||
'Tenant is None')
|
||||
|
||||
def test_node_network_cli(self):
|
||||
@ -1110,9 +1111,9 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# Get fake cloud record and set status to DELETING
|
||||
manager = pool.getProviderManager('fake-provider')
|
||||
for instance in manager.adapter._listServers():
|
||||
for instance in manager.adapter._client._server_list:
|
||||
if instance.id == nodes[0].external_id:
|
||||
instance.status = 'DELETED'
|
||||
instance['status'] = 'DELETED'
|
||||
break
|
||||
|
||||
nodes[0].state = zk.DELETING
|
||||
@ -1123,7 +1124,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
api_record_remains = False
|
||||
for instance in manager.adapter._listServers():
|
||||
if instance.id == nodes[0].external_id:
|
||||
if instance['id'] == nodes[0].external_id:
|
||||
api_record_remains = True
|
||||
break
|
||||
self.assertTrue(api_record_remains)
|
||||
|
Loading…
x
Reference in New Issue
Block a user