From fa3f4d6c5dd724a1fb2b811cd798fb80d075192e Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Mon, 15 Sep 2014 08:26:59 -0400 Subject: [PATCH] Partition static resources defined in pipeline.yaml Resources statically defined in pipeline.yaml are currently not subject to workload partitioning, so we can't do HA. If we have multiple agents running with the same pipeline.yaml, the samples will be duplicated. This patch partitions the static resources as well. Closes-bug: #1369538 Change-Id: Iff3b33db58302fb2e89b1b3722937a031a70be5f --- ceilometer/agent.py | 23 +++++++++++++--- ceilometer/tests/agentbase.py | 49 +++++++++++++++++++++++++++++++--- ceilometer/tests/test_utils.py | 8 ++++++ ceilometer/utils.py | 4 +++ 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/ceilometer/agent.py b/ceilometer/agent.py index cc0067a70..dad18f2e7 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline as publish_pipeline +from ceilometer import utils LOG = log.getLogger(__name__) @@ -53,7 +54,14 @@ class Resources(object): source_discovery = (self.agent_manager.discover(self._discovery, discovery_cache) if self._discovery else []) - return self._resources + source_discovery + static_resources = [] + if self._resources: + static_resources_group = self.agent_manager.construct_group_id( + utils.hash_of_set(self._resources)) + p_coord = self.agent_manager.partition_coordinator + static_resources = p_coord.extract_my_subset( + static_resources_group, self._resources) + return static_resources + source_discovery @staticmethod def key(source, pollster): @@ -145,8 +153,15 @@ class AgentManager(os_service.Service): ) def join_partitioning_groups(self): - groups = set([self._construct_group_id(d.obj.group_id) + groups = set([self.construct_group_id(d.obj.group_id) for d in self.discovery_manager]) + # let each set of statically-defined resources have its own group + static_resource_groups = set([ + self.construct_group_id(utils.hash_of_set(p.resources)) + for p in self.pipeline_manager.pipelines + if p.resources + ]) + groups.update(static_resource_groups) for group in groups: self.partition_coordinator.join_group(group) @@ -168,7 +183,7 @@ class AgentManager(os_service.Service): return polling_tasks - def _construct_group_id(self, discovery_group_id): + def construct_group_id(self, discovery_group_id): return ('%s-%s' % (self.group_prefix, discovery_group_id) if discovery_group_id else None) @@ -217,7 +232,7 @@ class AgentManager(os_service.Service): try: discovered = discoverer.discover(self, param) partitioned = self.partition_coordinator.extract_my_subset( - self._construct_group_id(discoverer.group_id), + self.construct_group_id(discoverer.group_id), discovered) resources.extend(partitioned) if discovery_cache is not None: diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 39e8d7543..b658bea52 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import transformer +from ceilometer import utils class TestSample(sample.Sample): @@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.mgr.discovery_manager = self.create_discovery_manager() self.mgr.join_partitioning_groups() p_coord = self.mgr.partition_coordinator - expected = [mock.call(self.mgr._construct_group_id(g)) - for g in ['another_group', 'global']] + static_group_ids = [utils.hash_of_set(p['resources']) + for p in self.pipeline_cfg + if p['resources']] + expected = [mock.call(self.mgr.construct_group_id(g)) + for g in ['another_group', 'global'] + static_group_ids] self.assertEqual(len(expected), len(p_coord.join_group.call_args_list)) for c in expected: self.assertIn(c, p_coord.join_group.call_args_list) @@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] + self.pipeline_cfg[0]['resources'] = [] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id), + expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id), d.obj.resources) for d in self.mgr.discovery_manager if hasattr(d.obj, 'resources')] @@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase): len(p_coord.extract_my_subset.call_args_list)) for c in expected: self.assertIn(c, p_coord.extract_my_subset.call_args_list) + + def test_static_resources_partitioning(self): + p_coord = self.mgr.partition_coordinator + self.mgr.default_discovery = [] + static_resources = ['static_1', 'static_2'] + static_resources2 = ['static_3', 'static_4'] + self.pipeline_cfg[0]['resources'] = static_resources + self.pipeline_cfg.append({ + 'name': "test_pipeline2", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': static_resources2, + 'transformers': [], + 'publishers': ["test"], + }) + # have one pipeline without static resources defined + self.pipeline_cfg.append({ + 'name': "test_pipeline3", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': [], + 'transformers': [], + 'publishers': ["test"], + }) + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + # Only two groups need to be created, one for each pipeline, + # even though counter test is used twice + expected = [mock.call(self.mgr.construct_group_id( + utils.hash_of_set(resources)), + resources) + for resources in [static_resources, + static_resources2]] + self.assertEqual(len(expected), + len(p_coord.extract_my_subset.call_args_list)) + for c in expected: + self.assertIn(c, p_coord.extract_my_subset.call_args_list) diff --git a/ceilometer/tests/test_utils.py b/ceilometer/tests/test_utils.py index f9a648102..bda3de191 100644 --- a/ceilometer/tests/test_utils.py +++ b/ceilometer/tests/test_utils.py @@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase): ('nested2[1].c', 'B')], sorted(pairs, key=lambda x: x[0])) + def test_hash_of_set(self): + x = ['a', 'b'] + y = ['a', 'b', 'a'] + z = ['a', 'c'] + self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y)) + self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z)) + self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z)) + def test_hash_ring(self): num_nodes = 10 num_keys = 1000 diff --git a/ceilometer/utils.py b/ceilometer/utils.py index c703521bc..016fb8a32 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -206,6 +206,10 @@ def uniq(dupes, attrs): return deduped +def hash_of_set(s): + return str(hash(frozenset(s))) + + class HashRing(object): def __init__(self, nodes, replicas=100):