From 87afc5ada176d2062bc0cc81c248d6813941a7d4 Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Tue, 9 Sep 2014 03:44:35 -0400 Subject: [PATCH] Partition swift pollster resources by tenant Since the latest discovery change, Swift pollsters on different agents only decide which who should poll. Most of the time there's only one endpoint, so only one agent gets to do any work. This patch fixes this by introducing a new TenantDiscovery, which enables Swift (and other in the future) pollsters to partition the set of keystone tenants among them and then each poll the samples for their assigned subset of tenants. Closes-Bug: #1365351 Change-Id: Iba4a3b91d5ee978213fdd6fcf8bb62b315324a52 --- ceilometer/central/discovery.py | 20 +++ ceilometer/objectstore/swift.py | 192 +++++++++++---------- ceilometer/plugin.py | 10 ++ ceilometer/tests/objectstore/test_swift.py | 99 +++++++---- setup.cfg | 1 + 5 files changed, 201 insertions(+), 121 deletions(-) diff --git a/ceilometer/central/discovery.py b/ceilometer/central/discovery.py index 902f37ad2..1e3ccb3f4 100644 --- a/ceilometer/central/discovery.py +++ b/ceilometer/central/discovery.py @@ -27,6 +27,13 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service') class EndpointDiscovery(plugin.DiscoveryBase): + """Discovery that supplies service endpoints. + + This discovery should be used when the relevant APIs are not well suited + to dividing the pollster's work into smaller pieces than a whole service + at once. Example of this is the floating_ip pollster which calls + nova.floating_ips.list() and therefore gets all floating IPs at once. + """ def discover(self, manager, param=None): if not param: @@ -40,3 +47,16 @@ class EndpointDiscovery(plugin.DiscoveryBase): return [] else: return endpoints + + +class TenantDiscovery(plugin.DiscoveryBase): + """Discovery that supplies keystone tenants. + + This discovery should be used when the pollster's work can't be divided + into smaller pieces than per-tenant. Example of this is the Swift + pollster, which polls account details and does so per-tenant. + """ + + def discover(self, manager, param=None): + tenants = manager.keystone.tenants.list() + return tenants or [] diff --git a/ceilometer/objectstore/swift.py b/ceilometer/objectstore/swift.py index d4c14d020..d87f99547 100644 --- a/ceilometer/objectstore/swift.py +++ b/ceilometer/objectstore/swift.py @@ -19,12 +19,14 @@ from __future__ import absolute_import +from keystoneclient import exceptions from oslo.config import cfg from oslo.utils import timeutils import six.moves.urllib.parse as urlparse from swiftclient import client as swift from ceilometer.central import plugin +from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer import sample @@ -43,30 +45,44 @@ cfg.CONF.register_opts(OPTS) class _Base(plugin.CentralPollster): - CACHE_KEY_TENANT = 'tenants' METHOD = 'head' + _ENDPOINT = None @property def default_discovery(self): - return 'endpoint:object-store' + return 'tenant' @property def CACHE_KEY_METHOD(self): return 'swift.%s_account' % self.METHOD - def _iter_accounts(self, ksclient, cache, endpoint): - key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT) - key_method = '%s-%s' % (endpoint, self.CACHE_KEY_METHOD) - if key_tenant not in cache: - cache[key_tenant] = ksclient.tenants.list() - if key_method not in cache: - cache[key_method] = list(self._get_account_info( - ksclient, cache, endpoint)) - return iter(cache[key_method]) + @staticmethod + def _get_endpoint(ksclient): + # we store the endpoint as a base class attribute, so keystone is + # only ever called once + if _Base._ENDPOINT is None: + try: + endpoint_type = cfg.CONF.service_credentials.os_endpoint_type + endpoint = ksclient.service_catalog.url_for( + service_type='object-store', + endpoint_type=endpoint_type) + _Base._ENDPOINT = endpoint + except exceptions.EndpointNotFound: + LOG.debug(_("Swift endpoint not found")) + return _Base._ENDPOINT - def _get_account_info(self, ksclient, cache, endpoint): - key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT) - for t in cache[key_tenant]: + def _iter_accounts(self, ksclient, cache, tenants): + if self.CACHE_KEY_METHOD not in cache: + cache[self.CACHE_KEY_METHOD] = list(self._get_account_info( + ksclient, tenants)) + return iter(cache[self.CACHE_KEY_METHOD]) + + def _get_account_info(self, ksclient, tenants): + endpoint = self._get_endpoint(ksclient) + if not endpoint: + raise StopIteration() + + for t in tenants: api_method = '%s_account' % self.METHOD yield (t.id, getattr(swift, api_method) (self._neaten_url(endpoint, t.id), @@ -82,58 +98,58 @@ class _Base(plugin.CentralPollster): class ObjectsPollster(_Base): """Iterate over all accounts, using keystone.""" def get_samples(self, manager, cache, resources): - for endpoint in resources: - for tenant, account in self._iter_accounts(manager.keystone, - cache, endpoint): - yield sample.Sample( - name='storage.objects', - type=sample.TYPE_GAUGE, - volume=int(account['x-account-object-count']), - unit='object', - user_id=None, - project_id=tenant, - resource_id=tenant, - timestamp=timeutils.isotime(), - resource_metadata=None, - ) + tenants = resources + for tenant, account in self._iter_accounts(manager.keystone, + cache, tenants): + yield sample.Sample( + name='storage.objects', + type=sample.TYPE_GAUGE, + volume=int(account['x-account-object-count']), + unit='object', + user_id=None, + project_id=tenant, + resource_id=tenant, + timestamp=timeutils.isotime(), + resource_metadata=None, + ) class ObjectsSizePollster(_Base): """Iterate over all accounts, using keystone.""" def get_samples(self, manager, cache, resources): - for endpoint in resources: - for tenant, account in self._iter_accounts(manager.keystone, - cache, endpoint): - yield sample.Sample( - name='storage.objects.size', - type=sample.TYPE_GAUGE, - volume=int(account['x-account-bytes-used']), - unit='B', - user_id=None, - project_id=tenant, - resource_id=tenant, - timestamp=timeutils.isotime(), - resource_metadata=None, - ) + tenants = resources + for tenant, account in self._iter_accounts(manager.keystone, + cache, tenants): + yield sample.Sample( + name='storage.objects.size', + type=sample.TYPE_GAUGE, + volume=int(account['x-account-bytes-used']), + unit='B', + user_id=None, + project_id=tenant, + resource_id=tenant, + timestamp=timeutils.isotime(), + resource_metadata=None, + ) class ObjectsContainersPollster(_Base): """Iterate over all accounts, using keystone.""" def get_samples(self, manager, cache, resources): - for endpoint in resources: - for tenant, account in self._iter_accounts(manager.keystone, - cache, endpoint): - yield sample.Sample( - name='storage.objects.containers', - type=sample.TYPE_GAUGE, - volume=int(account['x-account-container-count']), - unit='container', - user_id=None, - project_id=tenant, - resource_id=tenant, - timestamp=timeutils.isotime(), - resource_metadata=None, - ) + tenants = resources + for tenant, account in self._iter_accounts(manager.keystone, + cache, tenants): + yield sample.Sample( + name='storage.objects.containers', + type=sample.TYPE_GAUGE, + volume=int(account['x-account-container-count']), + unit='container', + user_id=None, + project_id=tenant, + resource_id=tenant, + timestamp=timeutils.isotime(), + resource_metadata=None, + ) class ContainersObjectsPollster(_Base): @@ -142,22 +158,22 @@ class ContainersObjectsPollster(_Base): METHOD = 'get' def get_samples(self, manager, cache, resources): - for endpoint in resources: - for project, account in self._iter_accounts(manager.keystone, - cache, endpoint): - containers_info = account[1] - for container in containers_info: - yield sample.Sample( - name='storage.containers.objects', - type=sample.TYPE_GAUGE, - volume=int(container['count']), - unit='object', - user_id=None, - project_id=project, - resource_id=project + '/' + container['name'], - timestamp=timeutils.isotime(), - resource_metadata=None, - ) + tenants = resources + for tenant, account in self._iter_accounts(manager.keystone, + cache, tenants): + containers_info = account[1] + for container in containers_info: + yield sample.Sample( + name='storage.containers.objects', + type=sample.TYPE_GAUGE, + volume=int(container['count']), + unit='object', + user_id=None, + project_id=tenant, + resource_id=tenant + '/' + container['name'], + timestamp=timeutils.isotime(), + resource_metadata=None, + ) class ContainersSizePollster(_Base): @@ -166,19 +182,19 @@ class ContainersSizePollster(_Base): METHOD = 'get' def get_samples(self, manager, cache, resources): - for endpoint in resources: - for project, account in self._iter_accounts(manager.keystone, - cache, endpoint): - containers_info = account[1] - for container in containers_info: - yield sample.Sample( - name='storage.containers.objects.size', - type=sample.TYPE_GAUGE, - volume=int(container['bytes']), - unit='B', - user_id=None, - project_id=project, - resource_id=project + '/' + container['name'], - timestamp=timeutils.isotime(), - resource_metadata=None, - ) + tenants = resources + for tenant, account in self._iter_accounts(manager.keystone, + cache, tenants): + containers_info = account[1] + for container in containers_info: + yield sample.Sample( + name='storage.containers.objects.size', + type=sample.TYPE_GAUGE, + volume=int(container['bytes']), + unit='B', + user_id=None, + project_id=tenant, + resource_id=tenant + '/' + container['name'], + timestamp=timeutils.isotime(), + resource_metadata=None, + ) diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index a0fb9fa17..1e96e9e8a 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -166,6 +166,16 @@ class DiscoveryBase(object): def discover(self, manager, param=None): """Discover resources to monitor. + The most fine-grained discovery should be preferred, so the work is + the most evenly distributed among multiple agents (if they exist). + + For example: + if the pollster can separately poll individual resources, it should + have its own discovery implementation to discover those resources. If + it can only poll per-tenant, then the `TenantDiscovery` should be + used. If even that is not possible, use `EndpointDiscovery` (see + their respective docstrings). + :param manager: The service manager class invoking the plugin. :param param: an optional parameter to guide the discovery """ diff --git a/ceilometer/tests/objectstore/test_swift.py b/ceilometer/tests/objectstore/test_swift.py index b6f5d0594..c1bc6a2aa 100644 --- a/ceilometer/tests/objectstore/test_swift.py +++ b/ceilometer/tests/objectstore/test_swift.py @@ -35,9 +35,13 @@ HEAD_ACCOUNTS = [('tenant-000', {'x-account-object-count': 12, ('tenant-001', {'x-account-object-count': 34, 'x-account-bytes-used': 9898989898, 'x-account-container-count': 17, - })] + }), + ('tenant-002-ignored', {'x-account-object-count': 34, + 'x-account-bytes-used': 9898989898, + 'x-account-container-count': 17, + })] -GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10, +GET_ACCOUNTS = [('tenant-000', ({'x-account-object-count': 10, 'x-account-bytes-used': 123123, 'x-account-container-count': 2, }, @@ -48,12 +52,17 @@ GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10, 'bytes': 0, 'name': 'new_container' }])), - ('tenant-003', ({'x-account-object-count': 0, + ('tenant-001', ({'x-account-object-count': 0, 'x-account-bytes-used': 0, 'x-account-container-count': 0, - }, [])), ] + }, [])), + ('tenant-002-ignored', ({'x-account-object-count': 0, + 'x-account-bytes-used': 0, + 'x-account-container-count': 0, + }, []))] -ENDPOINT = 'end://point' +Tenant = collections.namedtuple('Tenant', 'id') +ASSIGNED_TENANTS = [Tenant('tenant-000'), Tenant('tenant-001')] class TestManager(manager.AgentManager): @@ -85,9 +94,11 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios, def fake_ks_service_catalog_url_for(*args, **kwargs): raise exceptions.EndpointNotFound("Fake keystone exception") - def fake_iter_accounts(self, ksclient, cache, endpoint): + def fake_iter_accounts(self, ksclient, cache, tenants): + tenant_ids = [t.id for t in tenants] for i in self.ACCOUNTS: - yield i + if i[0] in tenant_ids: + yield i @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): @@ -100,42 +111,35 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios, else: self.ACCOUNTS = GET_ACCOUNTS + def tearDown(self): + super(TestSwiftPollster, self).tearDown() + swift._Base._ENDPOINT = None + def test_iter_accounts_no_cache(self): cache = {} with mockpatch.PatchObject(self.factory, '_get_account_info', return_value=[]): data = list(self.pollster._iter_accounts(mock.Mock(), cache, - ENDPOINT)) + ASSIGNED_TENANTS)) - self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT) - in cache) - self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_METHOD) - in cache) + self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache) self.assertEqual([], data) - def test_iter_accounts_tenants_cached(self): - # Verify that if there are tenants pre-cached then the account - # info loop iterates over those instead of asking for the list - # again. - ksclient = mock.Mock() - ksclient.tenants.list.side_effect = AssertionError( + def test_iter_accounts_cached(self): + # Verify that if a method has already been called, _iter_accounts + # uses the cached version and doesn't call swiftclient. + mock_method = mock.Mock() + mock_method.side_effect = AssertionError( 'should not be called', ) api_method = '%s_account' % self.pollster.METHOD - with mockpatch.PatchObject(swift_client, api_method, new=ksclient): - key = '%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT) + with mockpatch.PatchObject(swift_client, api_method, new=mock_method): with mockpatch.PatchObject(self.factory, '_neaten_url'): - Tenant = collections.namedtuple('Tenant', 'id') - cache = { - key: [ - Tenant(self.ACCOUNTS[0][0]) - ], - } + cache = {self.pollster.CACHE_KEY_METHOD: [self.ACCOUNTS[0]]} data = list(self.pollster._iter_accounts(mock.Mock(), cache, - ENDPOINT)) - self.assertTrue(key in cache) - self.assertEqual(self.ACCOUNTS[0][0], data[0][0]) + ASSIGNED_TENANTS)) + self.assertEqual([self.ACCOUNTS[0]], data) def test_neaten_url(self): test_endpoints = ['http://127.0.0.1:8080', @@ -158,24 +162,53 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios, with mockpatch.PatchObject(self.factory, '_iter_accounts', side_effect=self.fake_iter_accounts): samples = list(self.pollster.get_samples(self.manager, {}, - [ENDPOINT])) + ASSIGNED_TENANTS)) - self.assertEqual(2, len(samples)) + self.assertEqual(2, len(samples), self.pollster.__class__) def test_get_meter_names(self): with mockpatch.PatchObject(self.factory, '_iter_accounts', side_effect=self.fake_iter_accounts): samples = list(self.pollster.get_samples(self.manager, {}, - [ENDPOINT])) + ASSIGNED_TENANTS)) self.assertEqual(set([samples[0].name]), set([s.name for s in samples])) + def test_only_poll_assigned(self): + mock_method = mock.MagicMock() + endpoint = 'end://point/' + api_method = '%s_account' % self.pollster.METHOD + with mockpatch.PatchObject(swift_client, api_method, new=mock_method): + with mockpatch.PatchObject( + self.manager.keystone.service_catalog, 'url_for', + return_value=endpoint): + list(self.pollster.get_samples(self.manager, {}, + ASSIGNED_TENANTS)) + expected = [mock.call(self.pollster._neaten_url(endpoint, t.id), + self.manager.keystone.auth_token) + for t in ASSIGNED_TENANTS] + self.assertEqual(expected, mock_method.call_args_list) + + def test_get_endpoint_only_once(self): + mock_url_for = mock.MagicMock() + api_method = '%s_account' % self.pollster.METHOD + with mockpatch.PatchObject(swift_client, api_method, + new=mock.MagicMock()): + with mockpatch.PatchObject( + self.manager.keystone.service_catalog, 'url_for', + new=mock_url_for): + list(self.pollster.get_samples(self.manager, {}, + ASSIGNED_TENANTS)) + list(self.pollster.get_samples(self.manager, {}, + ASSIGNED_TENANTS)) + self.assertEqual(1, mock_url_for.call_count) + def test_endpoint_notfound(self): with mockpatch.PatchObject( self.manager.keystone.service_catalog, 'url_for', side_effect=self.fake_ks_service_catalog_url_for): samples = list(self.pollster.get_samples(self.manager, {}, - [ENDPOINT])) + ASSIGNED_TENANTS)) self.assertEqual(0, len(samples)) diff --git a/setup.cfg b/setup.cfg index fa8a76a4c..d1da2dc48 100644 --- a/setup.cfg +++ b/setup.cfg @@ -81,6 +81,7 @@ ceilometer.notification = ceilometer.discover = local_instances = ceilometer.compute.discovery:InstanceDiscovery endpoint = ceilometer.central.discovery:EndpointDiscovery + tenant = ceilometer.central.discovery:TenantDiscovery lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery lb_members = ceilometer.network.services.discovery:LBMembersDiscovery