Partition static resources defined in pipeline.yaml

Resources statically defined in pipeline.yaml are currently not
subject to workload partitioning, so we can't do HA. If we have
multiple agents running with the same pipeline.yaml, the samples
will be duplicated.

This patch partitions the static resources as well.

Closes-bug: #1369538
Change-Id: Iff3b33db58302fb2e89b1b3722937a031a70be5f
This commit is contained in:
Nejc Saje 2014-09-15 08:26:59 -04:00 committed by Nejc Saje
parent d8317189e5
commit fa3f4d6c5d
4 changed files with 77 additions and 7 deletions

View File

@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline as publish_pipeline
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -53,7 +54,14 @@ class Resources(object):
source_discovery = (self.agent_manager.discover(self._discovery,
discovery_cache)
if self._discovery else [])
return self._resources + source_discovery
static_resources = []
if self._resources:
static_resources_group = self.agent_manager.construct_group_id(
utils.hash_of_set(self._resources))
p_coord = self.agent_manager.partition_coordinator
static_resources = p_coord.extract_my_subset(
static_resources_group, self._resources)
return static_resources + source_discovery
@staticmethod
def key(source, pollster):
@ -145,8 +153,15 @@ class AgentManager(os_service.Service):
)
def join_partitioning_groups(self):
groups = set([self._construct_group_id(d.obj.group_id)
groups = set([self.construct_group_id(d.obj.group_id)
for d in self.discovery_manager])
# let each set of statically-defined resources have its own group
static_resource_groups = set([
self.construct_group_id(utils.hash_of_set(p.resources))
for p in self.pipeline_manager.pipelines
if p.resources
])
groups.update(static_resource_groups)
for group in groups:
self.partition_coordinator.join_group(group)
@ -168,7 +183,7 @@ class AgentManager(os_service.Service):
return polling_tasks
def _construct_group_id(self, discovery_group_id):
def construct_group_id(self, discovery_group_id):
return ('%s-%s' % (self.group_prefix,
discovery_group_id)
if discovery_group_id else None)
@ -217,7 +232,7 @@ class AgentManager(os_service.Service):
try:
discovered = discoverer.discover(self, param)
partitioned = self.partition_coordinator.extract_my_subset(
self._construct_group_id(discoverer.group_id),
self.construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:

View File

@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import transformer
from ceilometer import utils
class TestSample(sample.Sample):
@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
expected = [mock.call(self.mgr._construct_group_id(g))
for g in ['another_group', 'global']]
static_group_ids = [utils.hash_of_set(p['resources'])
for p in self.pipeline_cfg
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id),
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
if hasattr(d.obj, 'resources')]
@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
def test_static_resources_partitioning(self):
p_coord = self.mgr.partition_coordinator
self.mgr.default_discovery = []
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.pipeline_cfg[0]['resources'] = static_resources
self.pipeline_cfg.append({
'name': "test_pipeline2",
'interval': 60,
'counters': ['test', 'test2'],
'resources': static_resources2,
'transformers': [],
'publishers': ["test"],
})
# have one pipeline without static resources defined
self.pipeline_cfg.append({
'name': "test_pipeline3",
'interval': 60,
'counters': ['test', 'test2'],
'resources': [],
'transformers': [],
'publishers': ["test"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
# Only two groups need to be created, one for each pipeline,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
utils.hash_of_set(resources)),
resources)
for resources in [static_resources,
static_resources2]]
self.assertEqual(len(expected),
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)

View File

@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase):
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
def test_hash_of_set(self):
x = ['a', 'b']
y = ['a', 'b', 'a']
z = ['a', 'c']
self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000

View File

@ -206,6 +206,10 @@ def uniq(dupes, attrs):
return deduped
def hash_of_set(s):
return str(hash(frozenset(s)))
class HashRing(object):
def __init__(self, nodes, replicas=100):