Merge "Partition swift pollster resources by tenant"
This commit is contained in:
commit
145f617875
@ -27,6 +27,13 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service')
|
||||
|
||||
|
||||
class EndpointDiscovery(plugin.DiscoveryBase):
|
||||
"""Discovery that supplies service endpoints.
|
||||
|
||||
This discovery should be used when the relevant APIs are not well suited
|
||||
to dividing the pollster's work into smaller pieces than a whole service
|
||||
at once. Example of this is the floating_ip pollster which calls
|
||||
nova.floating_ips.list() and therefore gets all floating IPs at once.
|
||||
"""
|
||||
|
||||
def discover(self, manager, param=None):
|
||||
if not param:
|
||||
@ -40,3 +47,16 @@ class EndpointDiscovery(plugin.DiscoveryBase):
|
||||
return []
|
||||
else:
|
||||
return endpoints
|
||||
|
||||
|
||||
class TenantDiscovery(plugin.DiscoveryBase):
|
||||
"""Discovery that supplies keystone tenants.
|
||||
|
||||
This discovery should be used when the pollster's work can't be divided
|
||||
into smaller pieces than per-tenant. Example of this is the Swift
|
||||
pollster, which polls account details and does so per-tenant.
|
||||
"""
|
||||
|
||||
def discover(self, manager, param=None):
|
||||
tenants = manager.keystone.tenants.list()
|
||||
return tenants or []
|
||||
|
@ -19,12 +19,14 @@
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from keystoneclient import exceptions
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import timeutils
|
||||
import six.moves.urllib.parse as urlparse
|
||||
from swiftclient import client as swift
|
||||
|
||||
from ceilometer.central import plugin
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer import sample
|
||||
|
||||
@ -43,30 +45,44 @@ cfg.CONF.register_opts(OPTS)
|
||||
|
||||
class _Base(plugin.CentralPollster):
|
||||
|
||||
CACHE_KEY_TENANT = 'tenants'
|
||||
METHOD = 'head'
|
||||
_ENDPOINT = None
|
||||
|
||||
@property
|
||||
def default_discovery(self):
|
||||
return 'endpoint:object-store'
|
||||
return 'tenant'
|
||||
|
||||
@property
|
||||
def CACHE_KEY_METHOD(self):
|
||||
return 'swift.%s_account' % self.METHOD
|
||||
|
||||
def _iter_accounts(self, ksclient, cache, endpoint):
|
||||
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
|
||||
key_method = '%s-%s' % (endpoint, self.CACHE_KEY_METHOD)
|
||||
if key_tenant not in cache:
|
||||
cache[key_tenant] = ksclient.tenants.list()
|
||||
if key_method not in cache:
|
||||
cache[key_method] = list(self._get_account_info(
|
||||
ksclient, cache, endpoint))
|
||||
return iter(cache[key_method])
|
||||
@staticmethod
|
||||
def _get_endpoint(ksclient):
|
||||
# we store the endpoint as a base class attribute, so keystone is
|
||||
# only ever called once
|
||||
if _Base._ENDPOINT is None:
|
||||
try:
|
||||
endpoint_type = cfg.CONF.service_credentials.os_endpoint_type
|
||||
endpoint = ksclient.service_catalog.url_for(
|
||||
service_type='object-store',
|
||||
endpoint_type=endpoint_type)
|
||||
_Base._ENDPOINT = endpoint
|
||||
except exceptions.EndpointNotFound:
|
||||
LOG.debug(_("Swift endpoint not found"))
|
||||
return _Base._ENDPOINT
|
||||
|
||||
def _get_account_info(self, ksclient, cache, endpoint):
|
||||
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
|
||||
for t in cache[key_tenant]:
|
||||
def _iter_accounts(self, ksclient, cache, tenants):
|
||||
if self.CACHE_KEY_METHOD not in cache:
|
||||
cache[self.CACHE_KEY_METHOD] = list(self._get_account_info(
|
||||
ksclient, tenants))
|
||||
return iter(cache[self.CACHE_KEY_METHOD])
|
||||
|
||||
def _get_account_info(self, ksclient, tenants):
|
||||
endpoint = self._get_endpoint(ksclient)
|
||||
if not endpoint:
|
||||
raise StopIteration()
|
||||
|
||||
for t in tenants:
|
||||
api_method = '%s_account' % self.METHOD
|
||||
yield (t.id, getattr(swift, api_method)
|
||||
(self._neaten_url(endpoint, t.id),
|
||||
@ -82,9 +98,9 @@ class _Base(plugin.CentralPollster):
|
||||
class ObjectsPollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects',
|
||||
type=sample.TYPE_GAUGE,
|
||||
@ -101,9 +117,9 @@ class ObjectsPollster(_Base):
|
||||
class ObjectsSizePollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.size',
|
||||
type=sample.TYPE_GAUGE,
|
||||
@ -120,9 +136,9 @@ class ObjectsSizePollster(_Base):
|
||||
class ObjectsContainersPollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.containers',
|
||||
type=sample.TYPE_GAUGE,
|
||||
@ -142,9 +158,9 @@ class ContainersObjectsPollster(_Base):
|
||||
METHOD = 'get'
|
||||
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for project, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
@ -153,8 +169,8 @@ class ContainersObjectsPollster(_Base):
|
||||
volume=int(container['count']),
|
||||
unit='object',
|
||||
user_id=None,
|
||||
project_id=project,
|
||||
resource_id=project + '/' + container['name'],
|
||||
project_id=tenant,
|
||||
resource_id=tenant + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
@ -166,9 +182,9 @@ class ContainersSizePollster(_Base):
|
||||
METHOD = 'get'
|
||||
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for project, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
@ -177,8 +193,8 @@ class ContainersSizePollster(_Base):
|
||||
volume=int(container['bytes']),
|
||||
unit='B',
|
||||
user_id=None,
|
||||
project_id=project,
|
||||
resource_id=project + '/' + container['name'],
|
||||
project_id=tenant,
|
||||
resource_id=tenant + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
@ -166,6 +166,16 @@ class DiscoveryBase(object):
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor.
|
||||
|
||||
The most fine-grained discovery should be preferred, so the work is
|
||||
the most evenly distributed among multiple agents (if they exist).
|
||||
|
||||
For example:
|
||||
if the pollster can separately poll individual resources, it should
|
||||
have its own discovery implementation to discover those resources. If
|
||||
it can only poll per-tenant, then the `TenantDiscovery` should be
|
||||
used. If even that is not possible, use `EndpointDiscovery` (see
|
||||
their respective docstrings).
|
||||
|
||||
:param manager: The service manager class invoking the plugin.
|
||||
:param param: an optional parameter to guide the discovery
|
||||
"""
|
||||
|
@ -33,11 +33,15 @@ HEAD_ACCOUNTS = [('tenant-000', {'x-account-object-count': 12,
|
||||
'x-account-container-count': 7,
|
||||
}),
|
||||
('tenant-001', {'x-account-object-count': 34,
|
||||
'x-account-bytes-used': 9898989898,
|
||||
'x-account-container-count': 17,
|
||||
}),
|
||||
('tenant-002-ignored', {'x-account-object-count': 34,
|
||||
'x-account-bytes-used': 9898989898,
|
||||
'x-account-container-count': 17,
|
||||
})]
|
||||
|
||||
GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
|
||||
GET_ACCOUNTS = [('tenant-000', ({'x-account-object-count': 10,
|
||||
'x-account-bytes-used': 123123,
|
||||
'x-account-container-count': 2,
|
||||
},
|
||||
@ -48,12 +52,17 @@ GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
|
||||
'bytes': 0,
|
||||
'name': 'new_container'
|
||||
}])),
|
||||
('tenant-003', ({'x-account-object-count': 0,
|
||||
('tenant-001', ({'x-account-object-count': 0,
|
||||
'x-account-bytes-used': 0,
|
||||
'x-account-container-count': 0,
|
||||
}, [])), ]
|
||||
}, [])),
|
||||
('tenant-002-ignored', ({'x-account-object-count': 0,
|
||||
'x-account-bytes-used': 0,
|
||||
'x-account-container-count': 0,
|
||||
}, []))]
|
||||
|
||||
ENDPOINT = 'end://point'
|
||||
Tenant = collections.namedtuple('Tenant', 'id')
|
||||
ASSIGNED_TENANTS = [Tenant('tenant-000'), Tenant('tenant-001')]
|
||||
|
||||
|
||||
class TestManager(manager.AgentManager):
|
||||
@ -85,8 +94,10 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
def fake_ks_service_catalog_url_for(*args, **kwargs):
|
||||
raise exceptions.EndpointNotFound("Fake keystone exception")
|
||||
|
||||
def fake_iter_accounts(self, ksclient, cache, endpoint):
|
||||
def fake_iter_accounts(self, ksclient, cache, tenants):
|
||||
tenant_ids = [t.id for t in tenants]
|
||||
for i in self.ACCOUNTS:
|
||||
if i[0] in tenant_ids:
|
||||
yield i
|
||||
|
||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
@ -100,42 +111,35 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
else:
|
||||
self.ACCOUNTS = GET_ACCOUNTS
|
||||
|
||||
def tearDown(self):
|
||||
super(TestSwiftPollster, self).tearDown()
|
||||
swift._Base._ENDPOINT = None
|
||||
|
||||
def test_iter_accounts_no_cache(self):
|
||||
cache = {}
|
||||
with mockpatch.PatchObject(self.factory, '_get_account_info',
|
||||
return_value=[]):
|
||||
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
|
||||
ENDPOINT))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
|
||||
in cache)
|
||||
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_METHOD)
|
||||
in cache)
|
||||
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
|
||||
self.assertEqual([], data)
|
||||
|
||||
def test_iter_accounts_tenants_cached(self):
|
||||
# Verify that if there are tenants pre-cached then the account
|
||||
# info loop iterates over those instead of asking for the list
|
||||
# again.
|
||||
ksclient = mock.Mock()
|
||||
ksclient.tenants.list.side_effect = AssertionError(
|
||||
def test_iter_accounts_cached(self):
|
||||
# Verify that if a method has already been called, _iter_accounts
|
||||
# uses the cached version and doesn't call swiftclient.
|
||||
mock_method = mock.Mock()
|
||||
mock_method.side_effect = AssertionError(
|
||||
'should not be called',
|
||||
)
|
||||
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=ksclient):
|
||||
key = '%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=mock_method):
|
||||
with mockpatch.PatchObject(self.factory, '_neaten_url'):
|
||||
Tenant = collections.namedtuple('Tenant', 'id')
|
||||
cache = {
|
||||
key: [
|
||||
Tenant(self.ACCOUNTS[0][0])
|
||||
],
|
||||
}
|
||||
cache = {self.pollster.CACHE_KEY_METHOD: [self.ACCOUNTS[0]]}
|
||||
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
|
||||
ENDPOINT))
|
||||
self.assertTrue(key in cache)
|
||||
self.assertEqual(self.ACCOUNTS[0][0], data[0][0])
|
||||
ASSIGNED_TENANTS))
|
||||
self.assertEqual([self.ACCOUNTS[0]], data)
|
||||
|
||||
def test_neaten_url(self):
|
||||
test_endpoints = ['http://127.0.0.1:8080',
|
||||
@ -158,24 +162,53 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
with mockpatch.PatchObject(self.factory, '_iter_accounts',
|
||||
side_effect=self.fake_iter_accounts):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(2, len(samples))
|
||||
self.assertEqual(2, len(samples), self.pollster.__class__)
|
||||
|
||||
def test_get_meter_names(self):
|
||||
with mockpatch.PatchObject(self.factory, '_iter_accounts',
|
||||
side_effect=self.fake_iter_accounts):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(set([samples[0].name]),
|
||||
set([s.name for s in samples]))
|
||||
|
||||
def test_only_poll_assigned(self):
|
||||
mock_method = mock.MagicMock()
|
||||
endpoint = 'end://point/'
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=mock_method):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
return_value=endpoint):
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
expected = [mock.call(self.pollster._neaten_url(endpoint, t.id),
|
||||
self.manager.keystone.auth_token)
|
||||
for t in ASSIGNED_TENANTS]
|
||||
self.assertEqual(expected, mock_method.call_args_list)
|
||||
|
||||
def test_get_endpoint_only_once(self):
|
||||
mock_url_for = mock.MagicMock()
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method,
|
||||
new=mock.MagicMock()):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
new=mock_url_for):
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
self.assertEqual(1, mock_url_for.call_count)
|
||||
|
||||
def test_endpoint_notfound(self):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
side_effect=self.fake_ks_service_catalog_url_for):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(0, len(samples))
|
||||
|
@ -81,6 +81,7 @@ ceilometer.notification =
|
||||
ceilometer.discover =
|
||||
local_instances = ceilometer.compute.discovery:InstanceDiscovery
|
||||
endpoint = ceilometer.central.discovery:EndpointDiscovery
|
||||
tenant = ceilometer.central.discovery:TenantDiscovery
|
||||
lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery
|
||||
lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery
|
||||
lb_members = ceilometer.network.services.discovery:LBMembersDiscovery
|
||||
|
Loading…
Reference in New Issue
Block a user