Added resources support in pollster's interface

1. Change the pollster's interface to support resources:
- Added optional parameter 'resources' to the method get_samples. It
  specifies a list of the endpoints from which the pollster would get
  data.

2. Change the PollingTask to record the resources for each pollster.

3. Pass the resources recorded in PollingTask to the pollsters in
central agent.

This patch only changes the interface of pollsters and PollingTask in
central agent. The existing central agent's pollsters still ignore the
resources passed in by the PollingTask.

Implements: blueprint support-resources-pipeline-item
Change-Id: Ic7a83c63ea291d0527004113bc8d8bb09311d14f
This commit is contained in:
Lianhao Lu 2013-11-26 18:13:21 +08:00
parent 2d8e6d9264
commit 5de8c6fce5
9 changed files with 54 additions and 14 deletions

View File

@ -17,6 +17,7 @@
# under the License. # under the License.
import abc import abc
import collections
import itertools import itertools
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
@ -37,11 +38,17 @@ class PollingTask(object):
def __init__(self, agent_manager): def __init__(self, agent_manager):
self.manager = agent_manager self.manager = agent_manager
self.pollsters = set() self.pollsters = set()
# Resource definitions are indexed by the pollster
# Use dict of set here to remove the duplicated resource definitions
# for each pollster.
self.resources = collections.defaultdict(set)
self.publish_context = pipeline.PublishContext( self.publish_context = pipeline.PublishContext(
agent_manager.context) agent_manager.context)
def add(self, pollster, pipelines): def add(self, pollster, pipelines):
self.publish_context.add_pipelines(pipelines) self.publish_context.add_pipelines(pipelines)
for pipeline in pipelines:
self.resources[pollster.name].update(pipeline.resources)
self.pollsters.update([pollster]) self.pollsters.update([pollster])
@abc.abstractmethod @abc.abstractmethod

View File

@ -41,9 +41,11 @@ class PollingTask(agent.PollingTask):
for pollster in self.pollsters: for pollster in self.pollsters:
try: try:
LOG.info(_("Polling pollster %s"), pollster.name) LOG.info(_("Polling pollster %s"), pollster.name)
resources = list(self.resources[pollster.name])
samples = list(pollster.obj.get_samples( samples = list(pollster.obj.get_samples(
self.manager, self.manager,
cache, cache,
resources=resources,
)) ))
publisher(samples) publisher(samples)
except Exception as err: except Exception as err:

View File

@ -82,7 +82,7 @@ class _Base(plugin.CentralPollster):
class EnergyPollster(_Base): class EnergyPollster(_Base):
"""Measures energy consumption.""" """Measures energy consumption."""
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
"""Returns all samples.""" """Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache): for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
@ -102,7 +102,7 @@ class EnergyPollster(_Base):
class PowerPollster(_Base): class PowerPollster(_Base):
"""Measures power consumption.""" """Measures power consumption."""
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
"""Returns all samples.""" """Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache): for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(

View File

@ -101,7 +101,7 @@ class _Base(plugin.PollsterBase):
class ImagePollster(_Base): class ImagePollster(_Base):
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for image in self._iter_images(manager.keystone, cache): for image in self._iter_images(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
name='image', name='image',
@ -118,7 +118,7 @@ class ImagePollster(_Base):
class ImageSizePollster(_Base): class ImageSizePollster(_Base):
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for image in self._iter_images(manager.keystone, cache): for image in self._iter_images(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
name='image.size', name='image.size',

View File

@ -40,7 +40,7 @@ class FloatingIPPollster(plugin.CentralPollster):
cache['floating_ips'] = list(self._get_floating_ips()) cache['floating_ips'] = list(self._get_floating_ips())
return iter(cache['floating_ips']) return iter(cache['floating_ips'])
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for ip in self._iter_floating_ips(cache): for ip in self._iter_floating_ips(cache):
self.LOG.info(_("FLOATING IP USAGE: %s") % ip.ip) self.LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
# FIXME (flwang) Now Nova API /os-floating-ips can't provide those # FIXME (flwang) Now Nova API /os-floating-ips can't provide those

View File

@ -88,7 +88,7 @@ class ObjectsPollster(_Base):
"""Iterate over all accounts, using keystone. """Iterate over all accounts, using keystone.
""" """
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for tenant, account in self._iter_accounts(manager.keystone, cache): for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
name='storage.objects', name='storage.objects',
@ -107,7 +107,7 @@ class ObjectsSizePollster(_Base):
"""Iterate over all accounts, using keystone. """Iterate over all accounts, using keystone.
""" """
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for tenant, account in self._iter_accounts(manager.keystone, cache): for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
name='storage.objects.size', name='storage.objects.size',
@ -126,7 +126,7 @@ class ObjectsContainersPollster(_Base):
"""Iterate over all accounts, using keystone. """Iterate over all accounts, using keystone.
""" """
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for tenant, account in self._iter_accounts(manager.keystone, cache): for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample( yield sample.Sample(
name='storage.objects.containers', name='storage.objects.containers',
@ -147,7 +147,7 @@ class ContainersObjectsPollster(_Base):
METHOD = 'get' METHOD = 'get'
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for project, account in self._iter_accounts(manager.keystone, cache): for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1] containers_info = account[1]
for container in containers_info: for container in containers_info:
@ -170,7 +170,7 @@ class ContainersSizePollster(_Base):
METHOD = 'get' METHOD = 'get'
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
for project, account in self._iter_accounts(manager.keystone, cache): for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1] containers_info = account[1]
for container in containers_info: for container in containers_info:

View File

@ -21,6 +21,7 @@
import abc import abc
import collections import collections
import fnmatch import fnmatch
from oslo.config import cfg from oslo.config import cfg
import six import six
@ -90,7 +91,7 @@ class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API.""" """Base class for plugins that support the polling API."""
@abc.abstractmethod @abc.abstractmethod
def get_samples(self, manager, cache): def get_samples(self, manager, cache, resources=[]):
"""Return a sequence of Counter instances from polling the resources. """Return a sequence of Counter instances from polling the resources.
:param manager: The service manager class invoking the plugin. :param manager: The service manager class invoking the plugin.
@ -98,5 +99,8 @@ class PollsterBase(PluginBase):
between themselves when recomputing it would be between themselves when recomputing it would be
expensive (e.g., asking another service for a expensive (e.g., asking another service for a
list of objects). list of objects).
:param resources: A list of the endpoints the pollster will get data
from. It's up to the specific pollster to decide
how to use it.
""" """

View File

@ -29,6 +29,7 @@ from stevedore.tests import manager as extension_tests
from ceilometer import agent from ceilometer import agent
from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common.fixture import config
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import plugin
from ceilometer import sample from ceilometer import sample
from ceilometer.tests import base from ceilometer.tests import base
from ceilometer import transformer from ceilometer import transformer
@ -47,21 +48,23 @@ default_test_data = sample.Sample(
) )
class TestPollster: class TestPollster(plugin.PollsterBase):
test_data = default_test_data test_data = default_test_data
def get_samples(self, manager, cache, instance=None): def get_samples(self, manager, cache, instance=None, resources=[]):
self.samples.append((manager, instance)) self.samples.append((manager, instance))
self.resources.extend(resources)
return [self.test_data] return [self.test_data]
class TestPollsterException(TestPollster): class TestPollsterException(TestPollster):
def get_samples(self, manager, cache, instance=None): def get_samples(self, manager, cache, instance=None, resources=[]):
# Put an instance parameter here so that it can be used # Put an instance parameter here so that it can be used
# by both central manager and compute manager # by both central manager and compute manager
# In future, we possibly don't need such hack if we # In future, we possibly don't need such hack if we
# combine the get_samples() function again # combine the get_samples() function again
self.samples.append((manager, instance)) self.samples.append((manager, instance))
self.resources.extend(resources)
raise Exception() raise Exception()
@ -69,10 +72,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class Pollster(TestPollster): class Pollster(TestPollster):
samples = [] samples = []
resources = []
test_data = default_test_data test_data = default_test_data
class PollsterAnother(TestPollster): class PollsterAnother(TestPollster):
samples = [] samples = []
resources = []
test_data = sample.Sample( test_data = sample.Sample(
name='testanother', name='testanother',
type=default_test_data.type, type=default_test_data.type,
@ -86,6 +91,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class PollsterException(TestPollsterException): class PollsterException(TestPollsterException):
samples = [] samples = []
resources = []
test_data = sample.Sample( test_data = sample.Sample(
name='testexception', name='testexception',
type=default_test_data.type, type=default_test_data.type,
@ -99,6 +105,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class PollsterExceptionAnother(TestPollsterException): class PollsterExceptionAnother(TestPollsterException):
samples = [] samples = []
resources = []
test_data = sample.Sample( test_data = sample.Sample(
name='testexceptionanother', name='testexceptionanother',
type=default_test_data.type, type=default_test_data.type,
@ -159,6 +166,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 60, 'interval': 60,
'counters': ['test'], 'counters': ['test'],
'resources': ['test://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, ] }, ]
@ -174,12 +182,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.PollsterAnother.samples = [] self.PollsterAnother.samples = []
self.PollsterException.samples = [] self.PollsterException.samples = []
self.PollsterExceptionAnother.samples = [] self.PollsterExceptionAnother.samples = []
self.Pollster.resources = []
self.PollsterAnother.resources = []
self.PollsterException.resources = []
self.PollsterExceptionAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown() super(BaseAgentManagerTestCase, self).tearDown()
def test_setup_polling_tasks(self): def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks() polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1) self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys()) self.assertTrue(60 in polling_tasks.keys())
self.assertEqual(len(polling_tasks[60].resources), 1)
self.assertEqual(len(polling_tasks[60].resources['test']), 1)
self.mgr.interval_task(polling_tasks.values()[0]) self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(pub.samples[0], self.Pollster.test_data) self.assertEqual(pub.samples[0], self.Pollster.test_data)
@ -189,6 +203,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 10, 'interval': 10,
'counters': ['test'], 'counters': ['test'],
'resources': ['test://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}) })
@ -204,6 +219,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline_1", 'name': "test_pipeline_1",
'interval': 10, 'interval': 10,
'counters': ['test_invalid'], 'counters': ['test_invalid'],
'resources': ['invalid://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}) })
@ -216,6 +232,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 60, 'interval': 60,
'counters': ['testanother'], 'counters': ['testanother'],
'resources': ['testanother://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}) })
@ -224,6 +241,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(len(polling_tasks), 1) self.assertEqual(len(polling_tasks), 1)
pollsters = polling_tasks.get(60).pollsters pollsters = polling_tasks.get(60).pollsters
self.assertEqual(len(pollsters), 2) self.assertEqual(len(pollsters), 2)
self.assertEqual(len(polling_tasks[60].resources), 2)
self.assertEqual(len(polling_tasks[60].resources['test']), 1)
self.assertEqual(len(polling_tasks[60].resources['testanother']), 1)
def test_interval_exception_isolation(self): def test_interval_exception_isolation(self):
self.pipeline_cfg = [ self.pipeline_cfg = [
@ -231,6 +251,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline_1", 'name': "test_pipeline_1",
'interval': 10, 'interval': 10,
'counters': ['testexceptionanother'], 'counters': ['testexceptionanother'],
'resources': ['test://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, },
@ -238,6 +259,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline_2", 'name': "test_pipeline_2",
'interval': 10, 'interval': 10,
'counters': ['testexception'], 'counters': ['testexception'],
'resources': ['test://'],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, },

View File

@ -44,3 +44,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.useFixture(mockpatch.Patch( self.useFixture(mockpatch.Patch(
'keystoneclient.v2_0.client.Client', 'keystoneclient.v2_0.client.Client',
return_value=None)) return_value=None))
def test_get_sample_resources(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.values()[0])
self.assertTrue(self.Pollster.resources)