Merge "Added resources support in pollster's interface"
This commit is contained in:
commit
a00917a935
@ -17,6 +17,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import collections
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
from ceilometer.openstack.common import context
|
from ceilometer.openstack.common import context
|
||||||
@ -37,11 +38,17 @@ class PollingTask(object):
|
|||||||
def __init__(self, agent_manager):
|
def __init__(self, agent_manager):
|
||||||
self.manager = agent_manager
|
self.manager = agent_manager
|
||||||
self.pollsters = set()
|
self.pollsters = set()
|
||||||
|
# Resource definitions are indexed by the pollster
|
||||||
|
# Use dict of set here to remove the duplicated resource definitions
|
||||||
|
# for each pollster.
|
||||||
|
self.resources = collections.defaultdict(set)
|
||||||
self.publish_context = pipeline.PublishContext(
|
self.publish_context = pipeline.PublishContext(
|
||||||
agent_manager.context)
|
agent_manager.context)
|
||||||
|
|
||||||
def add(self, pollster, pipelines):
|
def add(self, pollster, pipelines):
|
||||||
self.publish_context.add_pipelines(pipelines)
|
self.publish_context.add_pipelines(pipelines)
|
||||||
|
for pipeline in pipelines:
|
||||||
|
self.resources[pollster.name].update(pipeline.resources)
|
||||||
self.pollsters.update([pollster])
|
self.pollsters.update([pollster])
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
|
@ -41,9 +41,11 @@ class PollingTask(agent.PollingTask):
|
|||||||
for pollster in self.pollsters:
|
for pollster in self.pollsters:
|
||||||
try:
|
try:
|
||||||
LOG.info(_("Polling pollster %s"), pollster.name)
|
LOG.info(_("Polling pollster %s"), pollster.name)
|
||||||
|
resources = list(self.resources[pollster.name])
|
||||||
samples = list(pollster.obj.get_samples(
|
samples = list(pollster.obj.get_samples(
|
||||||
self.manager,
|
self.manager,
|
||||||
cache,
|
cache,
|
||||||
|
resources=resources,
|
||||||
))
|
))
|
||||||
publisher(samples)
|
publisher(samples)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -82,7 +82,7 @@ class _Base(plugin.CentralPollster):
|
|||||||
class EnergyPollster(_Base):
|
class EnergyPollster(_Base):
|
||||||
"""Measures energy consumption."""
|
"""Measures energy consumption."""
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
"""Returns all samples."""
|
"""Returns all samples."""
|
||||||
for probe in self._iter_probes(manager.keystone, cache):
|
for probe in self._iter_probes(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
@ -102,7 +102,7 @@ class EnergyPollster(_Base):
|
|||||||
class PowerPollster(_Base):
|
class PowerPollster(_Base):
|
||||||
"""Measures power consumption."""
|
"""Measures power consumption."""
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
"""Returns all samples."""
|
"""Returns all samples."""
|
||||||
for probe in self._iter_probes(manager.keystone, cache):
|
for probe in self._iter_probes(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
|
@ -101,7 +101,7 @@ class _Base(plugin.PollsterBase):
|
|||||||
|
|
||||||
class ImagePollster(_Base):
|
class ImagePollster(_Base):
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for image in self._iter_images(manager.keystone, cache):
|
for image in self._iter_images(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
name='image',
|
name='image',
|
||||||
@ -118,7 +118,7 @@ class ImagePollster(_Base):
|
|||||||
|
|
||||||
class ImageSizePollster(_Base):
|
class ImageSizePollster(_Base):
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for image in self._iter_images(manager.keystone, cache):
|
for image in self._iter_images(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
name='image.size',
|
name='image.size',
|
||||||
|
@ -40,7 +40,7 @@ class FloatingIPPollster(plugin.CentralPollster):
|
|||||||
cache['floating_ips'] = list(self._get_floating_ips())
|
cache['floating_ips'] = list(self._get_floating_ips())
|
||||||
return iter(cache['floating_ips'])
|
return iter(cache['floating_ips'])
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for ip in self._iter_floating_ips(cache):
|
for ip in self._iter_floating_ips(cache):
|
||||||
self.LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
|
self.LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
|
||||||
# FIXME (flwang) Now Nova API /os-floating-ips can't provide those
|
# FIXME (flwang) Now Nova API /os-floating-ips can't provide those
|
||||||
|
@ -88,7 +88,7 @@ class ObjectsPollster(_Base):
|
|||||||
"""Iterate over all accounts, using keystone.
|
"""Iterate over all accounts, using keystone.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
name='storage.objects',
|
name='storage.objects',
|
||||||
@ -107,7 +107,7 @@ class ObjectsSizePollster(_Base):
|
|||||||
"""Iterate over all accounts, using keystone.
|
"""Iterate over all accounts, using keystone.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
name='storage.objects.size',
|
name='storage.objects.size',
|
||||||
@ -126,7 +126,7 @@ class ObjectsContainersPollster(_Base):
|
|||||||
"""Iterate over all accounts, using keystone.
|
"""Iterate over all accounts, using keystone.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
for tenant, account in self._iter_accounts(manager.keystone, cache):
|
||||||
yield sample.Sample(
|
yield sample.Sample(
|
||||||
name='storage.objects.containers',
|
name='storage.objects.containers',
|
||||||
@ -147,7 +147,7 @@ class ContainersObjectsPollster(_Base):
|
|||||||
|
|
||||||
METHOD = 'get'
|
METHOD = 'get'
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for project, account in self._iter_accounts(manager.keystone, cache):
|
for project, account in self._iter_accounts(manager.keystone, cache):
|
||||||
containers_info = account[1]
|
containers_info = account[1]
|
||||||
for container in containers_info:
|
for container in containers_info:
|
||||||
@ -170,7 +170,7 @@ class ContainersSizePollster(_Base):
|
|||||||
|
|
||||||
METHOD = 'get'
|
METHOD = 'get'
|
||||||
|
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
for project, account in self._iter_accounts(manager.keystone, cache):
|
for project, account in self._iter_accounts(manager.keystone, cache):
|
||||||
containers_info = account[1]
|
containers_info = account[1]
|
||||||
for container in containers_info:
|
for container in containers_info:
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import abc
|
import abc
|
||||||
import collections
|
import collections
|
||||||
import fnmatch
|
import fnmatch
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -90,7 +91,7 @@ class PollsterBase(PluginBase):
|
|||||||
"""Base class for plugins that support the polling API."""
|
"""Base class for plugins that support the polling API."""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def get_samples(self, manager, cache):
|
def get_samples(self, manager, cache, resources=[]):
|
||||||
"""Return a sequence of Counter instances from polling the resources.
|
"""Return a sequence of Counter instances from polling the resources.
|
||||||
|
|
||||||
:param manager: The service manager class invoking the plugin.
|
:param manager: The service manager class invoking the plugin.
|
||||||
@ -98,5 +99,8 @@ class PollsterBase(PluginBase):
|
|||||||
between themselves when recomputing it would be
|
between themselves when recomputing it would be
|
||||||
expensive (e.g., asking another service for a
|
expensive (e.g., asking another service for a
|
||||||
list of objects).
|
list of objects).
|
||||||
|
:param resources: A list of the endpoints the pollster will get data
|
||||||
|
from. It's up to the specific pollster to decide
|
||||||
|
how to use it.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -29,6 +29,7 @@ from stevedore.tests import manager as extension_tests
|
|||||||
from ceilometer import agent
|
from ceilometer import agent
|
||||||
from ceilometer.openstack.common.fixture import config
|
from ceilometer.openstack.common.fixture import config
|
||||||
from ceilometer import pipeline
|
from ceilometer import pipeline
|
||||||
|
from ceilometer import plugin
|
||||||
from ceilometer import sample
|
from ceilometer import sample
|
||||||
from ceilometer.tests import base
|
from ceilometer.tests import base
|
||||||
from ceilometer import transformer
|
from ceilometer import transformer
|
||||||
@ -47,21 +48,23 @@ default_test_data = sample.Sample(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestPollster:
|
class TestPollster(plugin.PollsterBase):
|
||||||
test_data = default_test_data
|
test_data = default_test_data
|
||||||
|
|
||||||
def get_samples(self, manager, cache, instance=None):
|
def get_samples(self, manager, cache, instance=None, resources=[]):
|
||||||
self.samples.append((manager, instance))
|
self.samples.append((manager, instance))
|
||||||
|
self.resources.extend(resources)
|
||||||
return [self.test_data]
|
return [self.test_data]
|
||||||
|
|
||||||
|
|
||||||
class TestPollsterException(TestPollster):
|
class TestPollsterException(TestPollster):
|
||||||
def get_samples(self, manager, cache, instance=None):
|
def get_samples(self, manager, cache, instance=None, resources=[]):
|
||||||
# Put an instance parameter here so that it can be used
|
# Put an instance parameter here so that it can be used
|
||||||
# by both central manager and compute manager
|
# by both central manager and compute manager
|
||||||
# In future, we possibly don't need such hack if we
|
# In future, we possibly don't need such hack if we
|
||||||
# combine the get_samples() function again
|
# combine the get_samples() function again
|
||||||
self.samples.append((manager, instance))
|
self.samples.append((manager, instance))
|
||||||
|
self.resources.extend(resources)
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
|
|
||||||
@ -69,10 +72,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
class Pollster(TestPollster):
|
class Pollster(TestPollster):
|
||||||
samples = []
|
samples = []
|
||||||
|
resources = []
|
||||||
test_data = default_test_data
|
test_data = default_test_data
|
||||||
|
|
||||||
class PollsterAnother(TestPollster):
|
class PollsterAnother(TestPollster):
|
||||||
samples = []
|
samples = []
|
||||||
|
resources = []
|
||||||
test_data = sample.Sample(
|
test_data = sample.Sample(
|
||||||
name='testanother',
|
name='testanother',
|
||||||
type=default_test_data.type,
|
type=default_test_data.type,
|
||||||
@ -86,6 +91,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
class PollsterException(TestPollsterException):
|
class PollsterException(TestPollsterException):
|
||||||
samples = []
|
samples = []
|
||||||
|
resources = []
|
||||||
test_data = sample.Sample(
|
test_data = sample.Sample(
|
||||||
name='testexception',
|
name='testexception',
|
||||||
type=default_test_data.type,
|
type=default_test_data.type,
|
||||||
@ -99,6 +105,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
class PollsterExceptionAnother(TestPollsterException):
|
class PollsterExceptionAnother(TestPollsterException):
|
||||||
samples = []
|
samples = []
|
||||||
|
resources = []
|
||||||
test_data = sample.Sample(
|
test_data = sample.Sample(
|
||||||
name='testexceptionanother',
|
name='testexceptionanother',
|
||||||
type=default_test_data.type,
|
type=default_test_data.type,
|
||||||
@ -159,6 +166,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline",
|
'name': "test_pipeline",
|
||||||
'interval': 60,
|
'interval': 60,
|
||||||
'counters': ['test'],
|
'counters': ['test'],
|
||||||
|
'resources': ['test://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
}, ]
|
}, ]
|
||||||
@ -174,12 +182,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
self.PollsterAnother.samples = []
|
self.PollsterAnother.samples = []
|
||||||
self.PollsterException.samples = []
|
self.PollsterException.samples = []
|
||||||
self.PollsterExceptionAnother.samples = []
|
self.PollsterExceptionAnother.samples = []
|
||||||
|
self.Pollster.resources = []
|
||||||
|
self.PollsterAnother.resources = []
|
||||||
|
self.PollsterException.resources = []
|
||||||
|
self.PollsterExceptionAnother.resources = []
|
||||||
super(BaseAgentManagerTestCase, self).tearDown()
|
super(BaseAgentManagerTestCase, self).tearDown()
|
||||||
|
|
||||||
def test_setup_polling_tasks(self):
|
def test_setup_polling_tasks(self):
|
||||||
polling_tasks = self.mgr.setup_polling_tasks()
|
polling_tasks = self.mgr.setup_polling_tasks()
|
||||||
self.assertEqual(len(polling_tasks), 1)
|
self.assertEqual(len(polling_tasks), 1)
|
||||||
self.assertTrue(60 in polling_tasks.keys())
|
self.assertTrue(60 in polling_tasks.keys())
|
||||||
|
self.assertEqual(len(polling_tasks[60].resources), 1)
|
||||||
|
self.assertEqual(len(polling_tasks[60].resources['test']), 1)
|
||||||
self.mgr.interval_task(polling_tasks.values()[0])
|
self.mgr.interval_task(polling_tasks.values()[0])
|
||||||
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
|
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(pub.samples[0], self.Pollster.test_data)
|
self.assertEqual(pub.samples[0], self.Pollster.test_data)
|
||||||
@ -189,6 +203,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline",
|
'name': "test_pipeline",
|
||||||
'interval': 10,
|
'interval': 10,
|
||||||
'counters': ['test'],
|
'counters': ['test'],
|
||||||
|
'resources': ['test://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
})
|
})
|
||||||
@ -204,6 +219,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline_1",
|
'name': "test_pipeline_1",
|
||||||
'interval': 10,
|
'interval': 10,
|
||||||
'counters': ['test_invalid'],
|
'counters': ['test_invalid'],
|
||||||
|
'resources': ['invalid://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
})
|
})
|
||||||
@ -216,6 +232,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline",
|
'name': "test_pipeline",
|
||||||
'interval': 60,
|
'interval': 60,
|
||||||
'counters': ['testanother'],
|
'counters': ['testanother'],
|
||||||
|
'resources': ['testanother://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
})
|
})
|
||||||
@ -224,6 +241,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
self.assertEqual(len(polling_tasks), 1)
|
self.assertEqual(len(polling_tasks), 1)
|
||||||
pollsters = polling_tasks.get(60).pollsters
|
pollsters = polling_tasks.get(60).pollsters
|
||||||
self.assertEqual(len(pollsters), 2)
|
self.assertEqual(len(pollsters), 2)
|
||||||
|
self.assertEqual(len(polling_tasks[60].resources), 2)
|
||||||
|
self.assertEqual(len(polling_tasks[60].resources['test']), 1)
|
||||||
|
self.assertEqual(len(polling_tasks[60].resources['testanother']), 1)
|
||||||
|
|
||||||
def test_interval_exception_isolation(self):
|
def test_interval_exception_isolation(self):
|
||||||
self.pipeline_cfg = [
|
self.pipeline_cfg = [
|
||||||
@ -231,6 +251,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline_1",
|
'name': "test_pipeline_1",
|
||||||
'interval': 10,
|
'interval': 10,
|
||||||
'counters': ['testexceptionanother'],
|
'counters': ['testexceptionanother'],
|
||||||
|
'resources': ['test://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
},
|
},
|
||||||
@ -238,6 +259,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
'name': "test_pipeline_2",
|
'name': "test_pipeline_2",
|
||||||
'interval': 10,
|
'interval': 10,
|
||||||
'counters': ['testexception'],
|
'counters': ['testexception'],
|
||||||
|
'resources': ['test://'],
|
||||||
'transformers': [],
|
'transformers': [],
|
||||||
'publishers': ["test"],
|
'publishers': ["test"],
|
||||||
},
|
},
|
||||||
|
@ -44,3 +44,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
|||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'keystoneclient.v2_0.client.Client',
|
'keystoneclient.v2_0.client.Client',
|
||||||
return_value=None))
|
return_value=None))
|
||||||
|
|
||||||
|
def test_get_sample_resources(self):
|
||||||
|
polling_tasks = self.mgr.setup_polling_tasks()
|
||||||
|
self.mgr.interval_task(polling_tasks.values()[0])
|
||||||
|
self.assertTrue(self.Pollster.resources)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user