Make compute discovery pollster-based, not agent-level
Partially-Implements-Blueprint: merge-compute-central-agents Change-Id: I02c9a12b0fcd55a7d59688853f3c08e505b4bf3f
This commit is contained in:
parent
28a253104a
commit
633c3db1e8
@ -101,7 +101,6 @@ class PollingTask(object):
|
||||
|
||||
def poll_and_publish(self):
|
||||
"""Polling sample and publish into pipeline."""
|
||||
agent_resources = self.manager.discover()
|
||||
cache = {}
|
||||
discovery_cache = {}
|
||||
for source, pollster in self.pollster_matches:
|
||||
@ -113,8 +112,7 @@ class PollingTask(object):
|
||||
[pollster.obj.default_discovery], discovery_cache)
|
||||
key = Resources.key(source, pollster)
|
||||
source_resources = list(self.resources[key].get(discovery_cache))
|
||||
polling_resources = (source_resources or pollster_resources or
|
||||
agent_resources)
|
||||
polling_resources = (source_resources or pollster_resources)
|
||||
if not polling_resources:
|
||||
LOG.info(_("Skip polling pollster %s, no resources found"),
|
||||
pollster.name)
|
||||
@ -136,10 +134,8 @@ class PollingTask(object):
|
||||
|
||||
class AgentManager(os_service.Service):
|
||||
|
||||
def __init__(self, namespace, default_discovery=None, group_prefix=None):
|
||||
def __init__(self, namespace, group_prefix=None):
|
||||
super(AgentManager, self).__init__()
|
||||
default_discovery = default_discovery or []
|
||||
self.default_discovery = default_discovery
|
||||
self.pollster_manager = self._extensions('poll', namespace)
|
||||
self.discovery_manager = self._extensions('discover')
|
||||
self.context = context.RequestContext('admin', 'admin', is_admin=True)
|
||||
@ -226,7 +222,8 @@ class AgentManager(os_service.Service):
|
||||
|
||||
def discover(self, discovery=None, discovery_cache=None):
|
||||
resources = []
|
||||
for url in (discovery or self.default_discovery):
|
||||
discovery = discovery or []
|
||||
for url in discovery:
|
||||
if discovery_cache is not None and url in discovery_cache:
|
||||
resources.extend(discovery_cache[url])
|
||||
continue
|
||||
|
@ -25,7 +25,7 @@ LOG = log.getLogger(__name__)
|
||||
class AgentManager(agent.AgentManager):
|
||||
|
||||
def __init__(self):
|
||||
super(AgentManager, self).__init__('compute', ['local_instances'])
|
||||
super(AgentManager, self).__init__('compute')
|
||||
self._inspector = virt_inspector.get_hypervisor_inspector()
|
||||
|
||||
@property
|
||||
|
@ -34,8 +34,7 @@ class ComputePollster(plugin.PollsterBase):
|
||||
|
||||
@property
|
||||
def default_discovery(self):
|
||||
# get resources from agent-default discovery
|
||||
return None
|
||||
return 'local_instances'
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_samples(self, manager, cache, resources):
|
||||
|
@ -425,71 +425,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.assertEqual(expected, self.DiscoveryAnother.params)
|
||||
self.assertEqual(expected, self.DiscoveryException.params)
|
||||
|
||||
def _do_test_per_agent_discovery(self,
|
||||
discovered_resources,
|
||||
static_resources):
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
if discovered_resources:
|
||||
self.mgr.default_discovery = [d.name
|
||||
for d in self.mgr.discovery_manager]
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.DiscoveryAnother.resources = [d[::-1]
|
||||
for d in discovered_resources]
|
||||
self.pipeline_cfg[0]['resources'] = static_resources
|
||||
self.setup_pipeline()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self._verify_discovery_params([None] if discovered_resources else [])
|
||||
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
|
||||
# compare resource lists modulo ordering
|
||||
self.assertEqual(set(static_resources or discovery),
|
||||
set(self.Pollster.resources))
|
||||
|
||||
def test_per_agent_discovery_discovered_only(self):
|
||||
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
|
||||
[])
|
||||
|
||||
def test_per_agent_discovery_static_only(self):
|
||||
self._do_test_per_agent_discovery([],
|
||||
['static_1', 'static_2'])
|
||||
|
||||
def test_per_agent_discovery_discovered_overridden_by_static(self):
|
||||
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
|
||||
['static_1', 'static_2'])
|
||||
|
||||
def test_per_agent_discovery_overridden_by_per_pollster_discovery(self):
|
||||
discovered_resources = ['discovered_1', 'discovered_2']
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
self.mgr.default_discovery = ['testdiscoveryanother',
|
||||
'testdiscoverynonexistent',
|
||||
'testdiscoveryexception']
|
||||
self.pipeline_cfg[0]['resources'] = []
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.DiscoveryAnother.resources = [d[::-1]
|
||||
for d in discovered_resources]
|
||||
self.setup_pipeline()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual(set(self.Discovery.resources),
|
||||
set(self.Pollster.resources))
|
||||
|
||||
def test_per_agent_discovery_overridden_by_per_pipeline_discovery(self):
|
||||
discovered_resources = ['discovered_1', 'discovered_2']
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.DiscoveryAnother.resources = [d[::-1]
|
||||
for d in discovered_resources]
|
||||
self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother',
|
||||
'testdiscoverynonexistent',
|
||||
'testdiscoveryexception']
|
||||
self.pipeline_cfg[0]['resources'] = []
|
||||
self.setup_pipeline()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual(set(self.DiscoveryAnother.resources),
|
||||
set(self.Pollster.resources))
|
||||
|
||||
def _do_test_per_pollster_discovery(self, discovered_resources,
|
||||
static_resources):
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
@ -705,7 +640,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
|
||||
def test_static_resources_partitioning(self):
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
self.mgr.default_discovery = []
|
||||
static_resources = ['static_1', 'static_2']
|
||||
static_resources2 = ['static_3', 'static_4']
|
||||
self.pipeline_cfg[0]['resources'] = static_resources
|
||||
|
@ -18,76 +18,18 @@
|
||||
"""
|
||||
import mock
|
||||
from oslotest import base
|
||||
from oslotest import mockpatch
|
||||
|
||||
from ceilometer import agent
|
||||
from ceilometer.compute import manager
|
||||
from ceilometer import nova_client
|
||||
from ceilometer.tests import agentbase
|
||||
|
||||
|
||||
class TestManager(base.BaseTestCase):
|
||||
"""Test that compute manager loads some pollsters.
|
||||
|
||||
There is no need to test how does compute manager setups pollstering
|
||||
process, as it's actually the same that is done by base manager tests.
|
||||
"""
|
||||
|
||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
def test_load_plugins(self):
|
||||
mgr = manager.AgentManager()
|
||||
self.assertIsNotNone(list(mgr.pollster_manager))
|
||||
|
||||
|
||||
class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||
|
||||
def _fake_instance(self, name, state):
|
||||
instance = mock.MagicMock()
|
||||
instance.name = name
|
||||
setattr(instance, 'OS-EXT-STS:vm_state', state)
|
||||
return instance
|
||||
|
||||
def _raise_exception(self):
|
||||
raise Exception
|
||||
|
||||
@staticmethod
|
||||
def create_manager():
|
||||
return manager.AgentManager()
|
||||
|
||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
def setUp(self):
|
||||
self.source_resources = False
|
||||
super(TestRunTasks, self).setUp()
|
||||
|
||||
# Set up a fake instance value to be returned by
|
||||
# instance_get_all_by_host() so when the manager gets the list
|
||||
# of instances to poll we can control the results.
|
||||
self.instances = [self._fake_instance('doing', 'active'),
|
||||
self._fake_instance('resting', 'paused')]
|
||||
stillborn_instance = self._fake_instance('stillborn', 'error')
|
||||
|
||||
self.useFixture(mockpatch.PatchObject(
|
||||
nova_client.Client,
|
||||
'instance_get_all_by_host',
|
||||
side_effect=lambda *x: self.instances + [stillborn_instance]))
|
||||
|
||||
def test_setup_polling_tasks(self):
|
||||
super(TestRunTasks, self).test_setup_polling_tasks()
|
||||
self.assertEqual(self.Pollster.samples[0][1], self.instances)
|
||||
|
||||
def test_interval_exception_isolation(self):
|
||||
super(TestRunTasks, self).test_interval_exception_isolation()
|
||||
self.assertEqual(1, len(self.PollsterException.samples))
|
||||
self.assertEqual(1, len(self.PollsterExceptionAnother.samples))
|
||||
|
||||
def test_manager_exception_persistency(self):
|
||||
super(TestRunTasks, self).test_manager_exception_persistency()
|
||||
with mock.patch.object(nova_client.Client, 'instance_get_all_by_host',
|
||||
side_effect=lambda *x: self._raise_exception()):
|
||||
mgr = manager.AgentManager()
|
||||
polling_task = agent.PollingTask(mgr)
|
||||
polling_task.poll_and_publish()
|
||||
|
||||
def test_local_instances_default_agent_discovery(self):
|
||||
self.setup_pipeline()
|
||||
self.assertEqual(self.mgr.default_discovery, ['local_instances'])
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self._verify_discovery_params([])
|
||||
self.assertEqual(set(self.Pollster.resources),
|
||||
set(self.instances))
|
||||
|
Loading…
x
Reference in New Issue
Block a user