Report leaked resource metrics in statemachine driver

The OpenStack driver reports some leaked metrics.  Extend that in
a generic way to all statemachine drivers.  Doing so also adds
some more metrics to the OpenStack driver.

Change-Id: I97c01b54b576f922b201b28b117d34b5ee1a597d
This commit is contained in:
James E. Blair 2023-03-28 16:07:57 -07:00
parent e096928d30
commit d4f2c8b9e7
11 changed files with 183 additions and 69 deletions

View File

@ -547,27 +547,90 @@ Provider Metrics
Number of nodes per provider that are in one specific state. See
:ref:`nodepool.nodes <nodepool_nodes>` for a list of possible states.
.. zuul:stat:: nodepool.provider.<provider>.leaked.ports
.. zuul:stat:: nodepool.provider.<provider>.leaked
This hierarchy supplies driver-dependent information about leaked
resource cleanup. Non-zero values indicate an error situation as
resources should be cleaned up automatically.
.. zuul:stat:: nodepool.provider.<provider>.leaked.amis
:type: counter
Number of ports in the DOWN state that have been removed
automatically in the cleanup resources phase of the OpenStack
driver. Non-zero values indicate an error situation as ports
should be cleaned up automatically.
Drivers: AWS
.. zuul:stat:: nodepool.provider.<provider>.leaked.instances
Number of leaked AMIs removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.disks
:type: counter
Number of nodes not correctly recorded in Zookeeper that nodepool
has cleaned up automatically. Non-zero values indicate an error
situation as instances should be cleaned automatically.
Drivers: Azure
Number of leaked disks removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.floatingips
:type: counter
Records the number of unattached floating IPs removed automatically
by nodepool. Elevated rates indicate an error situation as
floating IPs should be managed automatically.
Drivers: OpenStack, IBMVPC
Number of unattached floating IPs removed automatically by
Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.images
:type: counter
Drivers: Azure, IBMVPC
Number of leaked images removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.instances
:type: counter
Drivers: AWS, Azure, GCE, IBMVPC, OpenStack
Number of nodes not correctly recorded in Zookeeper that Nodepool
has cleaned up automatically.
.. zuul:stat:: nodepool.provider.<provider>.leaked.nics
:type: counter
Drivers: Azure
Number of leaked NICs removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.objects
:type: counter
Drivers: AWS, IBMVPC
Number of leaked storage objects removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.pips
:type: counter
Drivers: Azure
Number of leaked public IPs removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.ports
:type: counter
Drivers: OpenStack
Number of ports in the DOWN state that have been removed.
.. zuul:stat:: nodepool.provider.<provider>.leaked.snapshots
:type: counter
Drivers: AWS
Number of leaked snapshots removed automatically by Nodepool.
.. zuul:stat:: nodepool.provider.<provider>.leaked.volumes
:type: counter
Drivers: AWS
Number of leaked volumes removed automatically by Nodepool.
Launch metrics

View File

@ -108,9 +108,14 @@ class AwsInstance(statemachine.Instance):
class AwsResource(statemachine.Resource):
TYPE_INSTANCE = 'instance'
TYPE_AMI = 'ami'
TYPE_SNAPSHOT = 'snapshot'
TYPE_VOLUME = 'volume'
TYPE_OBJECT = 'object'
def __init__(self, metadata, type, id):
super().__init__(metadata)
self.type = type
super().__init__(metadata, type)
self.id = id
@ -307,7 +312,7 @@ class AwsAdapter(statemachine.Adapter):
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(instance.tags),
'instance', instance.id)
AwsResource.TYPE_INSTANCE, instance.id)
for volume in self._listVolumes():
try:
if volume.state.lower() == "deleted":
@ -315,7 +320,7 @@ class AwsAdapter(statemachine.Adapter):
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(volume.tags),
'volume', volume.id)
AwsResource.TYPE_VOLUME, volume.id)
for ami in self._listAmis():
try:
if ami.state.lower() == "deleted":
@ -323,7 +328,7 @@ class AwsAdapter(statemachine.Adapter):
except (botocore.exceptions.ClientError, AttributeError):
continue
yield AwsResource(tag_list_to_dict(ami.tags),
'ami', ami.id)
AwsResource.TYPE_AMI, ami.id)
for snap in self._listSnapshots():
try:
if snap.state.lower() == "deleted":
@ -331,7 +336,7 @@ class AwsAdapter(statemachine.Adapter):
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(snap.tags),
'snapshot', snap.id)
AwsResource.TYPE_SNAPSHOT, snap.id)
if self.provider.object_storage:
for obj in self._listObjects():
with self.non_mutating_rate_limiter:
@ -341,19 +346,19 @@ class AwsAdapter(statemachine.Adapter):
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(tags['TagSet']),
'object', obj.key)
AwsResource.TYPE_OBJECT, obj.key)
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'instance':
if resource.type == AwsResource.TYPE_INSTANCE:
self._deleteInstance(resource.id, immediate=True)
if resource.type == 'volume':
if resource.type == AwsResource.TYPE_VOLUME:
self._deleteVolume(resource.id)
if resource.type == 'ami':
if resource.type == AwsResource.TYPE_AMI:
self._deleteAmi(resource.id)
if resource.type == 'snapshot':
if resource.type == AwsResource.TYPE_SNAPSHOT:
self._deleteSnapshot(resource.id)
if resource.type == 'object':
if resource.type == AwsResource.TYPE_OBJECT:
self._deleteObject(resource.id)
def listInstances(self):

View File

@ -96,9 +96,14 @@ class AzureInstance(statemachine.Instance):
class AzureResource(statemachine.Resource):
TYPE_INSTANCE = 'INSTANCE'
TYPE_NIC = 'nic'
TYPE_PIP = 'pip'
TYPE_DISK = 'disk'
TYPE_IMAGE = 'image'
def __init__(self, metadata, type, name):
super().__init__(metadata)
self.type = type
super().__init__(metadata, type)
self.name = name
@ -357,27 +362,32 @@ class AzureAdapter(statemachine.Adapter):
def listResources(self):
for vm in self._listVirtualMachines():
yield AzureResource(vm.get('tags', {}), 'vm', vm['name'])
yield AzureResource(vm.get('tags', {}),
AzureResource.TYPE_INSTANCE, vm['name'])
for nic in self._listNetworkInterfaces():
yield AzureResource(nic.get('tags', {}), 'nic', nic['name'])
yield AzureResource(nic.get('tags', {}),
AzureResource.TYPE_NIC, nic['name'])
for pip in self._listPublicIPAddresses():
yield AzureResource(pip.get('tags', {}), 'pip', pip['name'])
yield AzureResource(pip.get('tags', {}),
AzureResource.TYPE_PIP, pip['name'])
for disk in self._listDisks():
yield AzureResource(disk.get('tags', {}), 'disk', disk['name'])
yield AzureResource(disk.get('tags', {}),
AzureResource.TYPE_DISK, disk['name'])
for image in self._listImages():
yield AzureResource(image.get('tags', {}), 'image', image['name'])
yield AzureResource(image.get('tags', {}),
AzureResource.TYPE_IMAGE, image['name'])
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.name}")
if resource.type == 'vm':
if resource.type == AzureResource.TYPE_INSTANCE:
crud = self.azul.virtual_machines
elif resource.type == 'nic':
elif resource.type == AzureResource.TYPE_NIC:
crud = self.azul.network_interfaces
elif resource.type == 'pip':
elif resource.type == AzureResource.TYPE_PIP:
crud = self.azul.public_ip_addresses
elif resource.type == 'disk':
elif resource.type == AzureResource.TYPE_DISK:
crud = self.azul.disks
elif resource.type == 'image':
elif resource.type == AzureResource.TYPE_IMAGE:
crud = self.azul.images
with self.rate_limiter:
crud.delete(self.resource_group, resource.name)

View File

@ -64,9 +64,10 @@ class GceInstance(statemachine.Instance):
class GceResource(statemachine.Resource):
TYPE_INSTANCE = 'instance'
def __init__(self, metadata, type, id):
super().__init__(metadata)
self.type = type
super().__init__(metadata, type)
self.id = id
@ -183,11 +184,12 @@ class GceAdapter(statemachine.Adapter):
if instance['status'] == 'TERMINATED':
continue
metadata = gce_metadata_to_dict(instance.get('metadata'))
yield GceResource(metadata, 'instance', instance['name'])
yield GceResource(metadata,
GceResource.TYPE_INSTANCE, instance['name'])
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'instance':
if resource.type == GceResource.TYPE_INSTANCE:
self._deleteInstance(resource.id)
def getQuotaLimits(self):

View File

@ -141,10 +141,14 @@ class IBMVPCInstance(statemachine.Instance):
class IBMVPCResource(statemachine.Resource):
TYPE_INSTANCE = 'instance'
TYPE_FIP = 'floatingip'
TYPE_IMAGE = 'image'
TYPE_OBJECT = 'object'
def __init__(self, type, obj, resource_group):
metadata = get_metadata_from_resource_group_object(resource_group, obj)
super().__init__(metadata)
self.type = type
super().__init__(metadata, type)
self.name = obj['name']
self.id = obj['id']
@ -392,15 +396,15 @@ class IBMVPCAdapter(statemachine.Adapter):
def listResources(self):
for vm in self._listInstances():
rg = self._getResourceGroupByReference(vm['resource_group'])
yield IBMVPCResource('instance', vm, rg)
yield IBMVPCResource(IBMVPCResource.TYPE_INSTANCE, vm, rg)
for fip in self._listFloatingIPs():
rg = self._getResourceGroupByReference(fip['resource_group'])
yield IBMVPCResource('fip', fip, rg)
yield IBMVPCResource(IBMVPCResource.TYPE_FIP, fip, rg)
for image in self._listImages():
if image['owner_type'] != 'user':
continue
rg = self._getResourceGroupByReference(image['resource_group'])
yield IBMVPCResource('image', image, rg)
yield IBMVPCResource(IBMVPCResource.TYPE_IMAGE, image, rg)
rgname = make_image_resource_group_name(self.provider.name)
if self.storage_instance and rgname in self.resource_groups:
rg = self.resource_groups[rgname]
@ -418,18 +422,19 @@ class IBMVPCAdapter(statemachine.Adapter):
'name': obj['Key'],
'id': obj['Key'],
}
yield IBMVPCResource('object', storage_object, rg)
yield IBMVPCResource(IBMVPCResource.TYPE_OBJECT,
storage_object, rg)
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.name}")
with self.rate_limiter:
if resource.type == 'instance':
if resource.type == IBMVPCResource.TYPE_INSTANCE:
self.cloud_vpc.delete_instance(resource.id)
elif resource.type == 'fip':
elif resource.type == IBMVPCResource.TYPE_FIP:
self.cloud_vpc.delete_floating_ip(resource.id)
elif resource.type == 'image':
elif resource.type == IBMVPCResource.TYPE_IMAGE:
self.cloud_vpc.delete_image(resource.id)
elif resource.type == 'object':
elif resource.type == IBMVPCResource.TYPE_OBJECT:
self.cloud_storage.delete_object(
Bucket=self.provider.object_storage['bucket-name'],
Key=resource.id)

View File

@ -140,13 +140,6 @@ class MetastaticInstance(statemachine.Instance):
return QuotaInformation(instances=1)
class MetastaticResource(statemachine.Resource):
def __init__(self, metadata, type, name):
super().__init__(metadata)
self.type = type
self.name = name
class MetastaticDeleteStateMachine(statemachine.StateMachine):
DEALLOCATING = 'deallocating node'
COMPLETE = 'complete'

View File

@ -92,9 +92,10 @@ class OpenStackInstance(statemachine.Instance):
class OpenStackResource(statemachine.Resource):
TYPE_INSTANCE = 'instance'
def __init__(self, metadata, type, id):
super().__init__(metadata)
self.type = type
super().__init__(metadata, type)
self.id = id
@ -444,7 +445,7 @@ class OpenStackAdapter(statemachine.Adapter):
if server.status.lower() == 'deleted':
continue
yield OpenStackResource(server.metadata,
'server', server.id)
OpenStackResource.TYPE_INSTANCE, server.id)
# Floating IP and port leakage can't be handled by the
# automatic resource cleanup in cleanupLeakedResources because
# openstack doesn't store metadata on those objects, so we
@ -456,7 +457,7 @@ class OpenStackAdapter(statemachine.Adapter):
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'server':
if resource.type == OpenStackResource.TYPE_INSTANCE:
self._deleteServer(resource.id)
def listInstances(self):

View File

@ -528,6 +528,7 @@ class StateMachineProvider(Provider, QuotaSupport):
self.log = logging.getLogger(
f"nodepool.StateMachineProvider.{provider.name}")
super().__init__()
self._statsd = stats.get_client()
self.provider = provider
self.adapter = adapter
# State machines
@ -742,6 +743,11 @@ class StateMachineProvider(Provider, QuotaSupport):
# condition.
try:
self.adapter.deleteResource(resource)
if self._statsd:
key = ('nodepool.provider.%s.leaked.%s'
% (self.provider.name,
resource.plural_metric_name))
self._statsd.incr(key, 1)
except Exception:
self.log.exception("Unable to delete leaked "
f"resource for node {node_id}")
@ -752,6 +758,11 @@ class StateMachineProvider(Provider, QuotaSupport):
# condition.
try:
self.adapter.deleteResource(resource)
if self._statsd:
key = ('nodepool.provider.%s.leaked.%s'
% (self.provider.name,
resource.plural_metric_name))
self._statsd.incr(key, 1)
except Exception:
self.log.exception("Unable to delete leaked "
f"resource for upload {upload_id}")
@ -887,20 +898,28 @@ class Resource:
This could be an instance, a disk, a floating IP, or anything
else. It is used by the driver to detect leaked resources so the
adapter can clean them up. There is one required attribute,
`metadata`.
adapter can clean them up.
This is a dictionary of key/value pairs initially supplied by the
driver to the adapter when an instance or image was created. This
is used by the driver to detect leaked resources. The adapter may
add any other information to this instance for its own bookeeping
(resource type, id, etc).
The `type` attribute should be an alphanumeric string suitable for
inclusion in a statsd metric name.
The `metadata` attribute is a dictionary of key/value pairs
initially supplied by the driver to the adapter when an instance
or image was created. This is used by the driver to detect leaked
resources. The adapter may add any other information to this
instance for its own bookeeping (resource type, id, etc).
The 'plural_metric_name' attribute is set in the constructor
automatically; override this value if necessary.
:param str type: The type of resource.
:param dict metadata: A dictionary of metadata for the resource.
"""
def __init__(self, metadata):
def __init__(self, metadata, type):
self.type = type
self.plural_metric_name = type + 's'
self.metadata = metadata

View File

@ -766,6 +766,15 @@ class TestDriverAws(tests.DBTestCase):
Bucket=obj.bucket_name, Key=obj.key)
except self.s3_client.exceptions.NoSuchKey:
break
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.instances',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.volumes',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.objects',
value='1', kind='c')
def test_aws_resource_cleanup_import_snapshot(self):
# This tests the import_snapshot path

View File

@ -1332,6 +1332,9 @@ class TestLauncher(tests.DBTestCase):
# Make sure we end up with only one server (the replacement)
servers = manager.adapter._listServers()
self.assertEqual(len(servers), 1)
self.assertReportedStat(
'nodepool.provider.fake-provider.leaked.instances',
value='1', kind='c')
def test_max_ready_age(self):
"""Test a node with exceeded max-ready-age is deleted"""

View File

@ -0,0 +1,4 @@
---
features:
- |
Many more drivers now report metrics for leaked resources.