diff --git a/ceilometer/central/manager.py b/ceilometer/central/manager.py index 4d3bbaac0..82787c494 100644 --- a/ceilometer/central/manager.py +++ b/ceilometer/central/manager.py @@ -16,7 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -import pkg_resources +from stevedore import extension from nova import manager @@ -33,43 +33,40 @@ PLUGIN_NAMESPACE = 'ceilometer.poll.central' class AgentManager(manager.Manager): def init_host(self): - self._load_plugins() + # FIXME(dhellmann): Currently assumes all plugins are + # enabled when they are discovered and + # importable. Need to add check against global + # configuration flag and check that asks the plugin if + # it should be enabled. + self.ext_manager = extension.ExtensionManager( + namespace=PLUGIN_NAMESPACE, + invoke_on_load=True, + ) return - def _load_plugins(self): - self.pollsters = [] - for ep in pkg_resources.iter_entry_points(PLUGIN_NAMESPACE): - try: - plugin_class = ep.load() - plugin = plugin_class() - # FIXME(dhellmann): Currently assumes all plugins are - # enabled when they are discovered and - # importable. Need to add check against global - # configuration flag and check that asks the plugin if - # it should be enabled. - self.pollsters.append((ep.name, plugin)) - LOG.info('loaded pollster %s:%s', - PLUGIN_NAMESPACE, ep.name) - except Exception as err: - LOG.warning('Failed to load pollster %s:%s', - ep.name, err) - LOG.exception(err) - if not self.pollsters: - LOG.warning('Failed to load any pollsters for %s', - PLUGIN_NAMESPACE) - return + @staticmethod + def publish_counters_from_one_pollster(ext, manager, context): + """Used to invoke the plugins loaded by the ExtensionManager. + """ + try: + LOG.info('polling %s', ext.name) + for c in ext.obj.get_counters(manager, context): + LOG.info('COUNTER: %s', c) + publish.publish_counter(context=context, + counter=c, + topic=cfg.CONF.metering_topic, + secret=cfg.CONF.metering_secret, + source=cfg.CONF.counter_source, + ) + except Exception as err: + LOG.warning('Continuing after error from %s: %s', + ext.name, err) + LOG.exception(err) def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" - for name, pollster in self.pollsters: - try: - LOG.info('polling %s', name) - for c in pollster.get_counters(self, context): - LOG.info('COUNTER: %s', c) - publish.publish_counter(context, c, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - ) - except Exception as err: - LOG.warning('Continuing after error from %s: %s', name, err) - LOG.exception(err) + self.ext_manager.map(self.publish_counters_from_one_pollster, + manager=self, + context=context, + ) + return diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index b2052dac6..a2ebc1db2 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -16,9 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -import functools -import itertools -import pkg_resources +from stevedore import extension from nova import manager @@ -45,60 +43,24 @@ class CollectorManager(manager.Manager): COLLECTOR_NAMESPACE = 'ceilometer.collector' - @staticmethod - def _load_plugins(plugin_namespace): - handlers = [] - # Listen for notifications from nova - for ep in pkg_resources.iter_entry_points(plugin_namespace): - LOG.info('attempting to load notification handler for %s:%s', - plugin_namespace, ep.name) - try: - # FIXME(dhellmann): Currently assumes all plugins are - # enabled when they are discovered and - # importable. Need to add check against global - # configuration flag and check that asks the plugin if - # it should be enabled. - plugin_class = ep.load() - plugin = plugin_class() - handlers.append(plugin) - except Exception as err: - LOG.warning('Failed to load notification handler %s: %s', - ep.name, err) - LOG.exception(err) - return handlers - def init_host(self): - # Use the nova configuration flags to get - # a connection to the RPC mechanism nova - # is using. + # FIXME(dhellmann): Update Manager API to get Service instance + # with existing rpc handle. self.connection = rpc.create_connection() storage.register_opts(cfg.CONF) self.storage_engine = storage.get_engine(cfg.CONF) self.storage_conn = self.storage_engine.get_connection(cfg.CONF) - self.handlers = self._load_plugins(self.COLLECTOR_NAMESPACE) + self.ext_manager = extension.ExtensionManager(self.COLLECTOR_NAMESPACE, + invoke_on_load=True, + ) - if not self.handlers: + if not list(self.ext_manager): LOG.warning('Failed to load any notification handlers for %s', - self.plugin_namespace) + self.COLLECTOR_NAMESPACE) - # FIXME(dhellmann): Should be using create_worker(), except - # that notification messages do not conform to the RPC - # invocation protocol (they do not include a "method" - # parameter). - # FIXME(dhellmann): Break this out into its own method - # so we can test the subscription logic. - for handler in self.handlers: - LOG.debug('Event types: %r', handler.get_event_types()) - for exchange_topic in handler.get_exchange_topics(cfg.CONF): - for topic in exchange_topic.topics: - self.connection.declare_topic_consumer( - queue_name="ceilometer.notifications", - topic=topic, - exchange_name=exchange_topic.exchange, - callback=self.process_notification, - ) + self.ext_manager.map(self._setup_subscription) # Set ourselves up as a separate worker for the metering data, # since the default for manager is to use create_consumer(). @@ -110,15 +72,36 @@ class CollectorManager(manager.Manager): self.connection.consume_in_thread() + def _setup_subscription(self, ext, *args, **kwds): + handler = ext.obj + LOG.debug('Event types: %r', handler.get_event_types()) + for exchange_topic in handler.get_exchange_topics(cfg.CONF): + for topic in exchange_topic.topics: + # FIXME(dhellmann): Should be using create_worker(), except + # that notification messages do not conform to the RPC + # invocation protocol (they do not include a "method" + # parameter). + self.connection.declare_topic_consumer( + queue_name="ceilometer.notifications", + topic=topic, + exchange_name=exchange_topic.exchange, + callback=self.process_notification, + ) + def process_notification(self, notification): """Make a notification processed by an handler.""" LOG.debug('notification %r', notification.get('event_type')) - for handler in self.handlers: - if notification['event_type'] in handler.get_event_types(): - for c in handler.process_notification(notification): - LOG.info('COUNTER: %s', c) - # FIXME(dhellmann): Spawn green thread? - self.publish_counter(c) + self.ext_manager.map(self._process_notification_for_ext, + notification=notification, + ) + + def _process_notification_for_ext(self, ext, notification): + handler = ext.obj + if notification['event_type'] in handler.get_event_types(): + for c in handler.process_notification(notification): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Spawn green thread? + self.publish_counter(c) @staticmethod def publish_counter(counter): diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index 137a5cf8e..f27dd46bb 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -16,7 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -import pkg_resources +from stevedore import extension from nova import manager @@ -33,47 +33,41 @@ PLUGIN_NAMESPACE = 'ceilometer.poll.compute' class AgentManager(manager.Manager): def init_host(self): - self._load_plugins() + # FIXME(dhellmann): Currently assumes all plugins are + # enabled when they are discovered and + # importable. Need to add check against global + # configuration flag and check that asks the plugin if + # it should be enabled. + self.ext_manager = extension.ExtensionManager( + namespace=PLUGIN_NAMESPACE, + invoke_on_load=True, + ) return - def _load_plugins(self): - self.pollsters = [] - for ep in pkg_resources.iter_entry_points(PLUGIN_NAMESPACE): - try: - plugin_class = ep.load() - plugin = plugin_class() - # FIXME(dhellmann): Currently assumes all plugins are - # enabled when they are discovered and - # importable. Need to add check against global - # configuration flag and check that asks the plugin if - # it should be enabled. - self.pollsters.append((ep.name, plugin)) - LOG.info('loaded pollster %s:%s', - PLUGIN_NAMESPACE, ep.name) - except Exception as err: - LOG.warning('Failed to load pollster %s:%s', - ep.name, err) - LOG.exception(err) - if not self.pollsters: - LOG.warning('Failed to load any pollsters for %s', - PLUGIN_NAMESPACE) - return + @staticmethod + def publish_counters_from_one_pollster(ext, manager, context, instance): + """Used to invoke the plugins loaded by the ExtensionManager. + """ + try: + LOG.info('polling %s', ext.name) + for c in ext.obj.get_counters(manager, instance): + LOG.info('COUNTER: %s', c) + publish.publish_counter(context, c, + cfg.CONF.metering_topic, + cfg.CONF.metering_secret, + ) + except Exception as err: + LOG.warning('Continuing after error from %s for %s: %s', + ext.name, instance.id, err) + LOG.exception(err) def poll_instance(self, context, instance): """Poll one instance.""" - for name, pollster in self.pollsters: - try: - LOG.info('polling %s', name) - for c in pollster.get_counters(self, instance): - LOG.info('COUNTER: %s', c) - publish.publish_counter(context, c, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - ) - except Exception as err: - LOG.warning('Continuing after error from %s for %s: %s', - name, instance.name, err) - LOG.exception(err) + self.ext_manager.map(self.publish_counters_from_one_pollster, + manager=self, + context=context, + instance=instance, + ) def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 529b84b23..6ec54cb8b 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -20,12 +20,15 @@ from datetime import datetime +from mock import patch + +from stevedore import extension +from stevedore.tests import manager as test_manager + from ceilometer import meter from ceilometer.collector import manager from ceilometer.openstack.common import cfg from ceilometer.storage import base -from ceilometer.openstack.common import rpc -from ceilometer.openstack.common import cfg from ceilometer.tests import base as tests_base from ceilometer.compute import notifications @@ -80,17 +83,6 @@ TEST_NOTICE = { } -class StubConnection(object): - def declare_topic_consumer(*args, **kwargs): - pass - - def create_worker(*args, **kwargs): - pass - - def consume_in_thread(self): - pass - - class TestCollectorManager(tests_base.TestCase): def setUp(self): @@ -100,9 +92,12 @@ class TestCollectorManager(tests_base.TestCase): #cfg.CONF.metering_secret = 'not-so-secret' def test_init_host(self): - self.stubs.Set(rpc, 'create_connection', lambda: StubConnection()) cfg.CONF.database_connection = 'log://localhost' - self.mgr.init_host() + # If we try to create a real RPC connection, init_host() never + # returns. Mock it out so we can establish the manager + # configuration. + with patch('ceilometer.openstack.common.rpc.create_connection'): + self.mgr.init_host() def test_valid_message(self): msg = {'counter_name': 'test', @@ -186,18 +181,20 @@ class TestCollectorManager(tests_base.TestCase): self.mgr.record_metering_data(self.ctx, msg) self.mox.VerifyAll() - def test_load_plugins(self): - results = self.mgr._load_plugins(self.mgr.COLLECTOR_NAMESPACE) - self.assert_(len(results) > 0) - - def test_load_no_plugins(self): - results = self.mgr._load_plugins("foobar.namespace") - self.assertEqual(results, []) - def test_process_notification(self): + # If we try to create a real RPC connection, init_host() never + # returns. Mock it out so we can establish the manager + # configuration. + with patch('ceilometer.openstack.common.rpc.create_connection'): + self.mgr.init_host() results = [] - self.stubs.Set(self.mgr, 'publish_counter', - lambda counter: results.append(counter)) - self.mgr.handlers = [notifications.Instance()] + self.stubs.Set(self.mgr, 'publish_counter', results.append) + self.mgr.ext_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + notifications.Instance(), + ), + ]) self.mgr.process_notification(TEST_NOTICE) self.assert_(len(results) >= 1) diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index d4a64708c..830b072d2 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -20,6 +20,8 @@ import datetime +from stevedore import extension + from ceilometer.compute import manager from ceilometer import counter from ceilometer import publish @@ -31,7 +33,7 @@ from ceilometer.openstack.common import cfg def test_load_plugins(): mgr = manager.AgentManager() mgr.init_host() - assert mgr.pollsters, 'Failed to load any plugins' + assert list(mgr.ext_manager), 'Failed to load any plugins' return @@ -63,7 +65,15 @@ class TestRunTasks(base.TestCase): self.notifications = [] self.stubs.Set(publish, 'publish_counter', self.faux_notify) self.mgr = manager.AgentManager() - self.mgr.pollsters = [('test', self.Pollster())] + self.mgr.ext_manager = extension.ExtensionManager('fake', + invoke_on_load=False, + ) + self.mgr.ext_manager.extensions = [extension.Extension('test', + None, + None, + self.Pollster(), + ), + ] # Set up a fake instance value to be returned by # instance_get_all_by_host() so when the manager gets the list # of instances to poll we can control the results. diff --git a/tests/compute/test_nova_notifier.py b/tests/compute/test_nova_notifier.py index 8a47e6ee6..07975a4cc 100644 --- a/tests/compute/test_nova_notifier.py +++ b/tests/compute/test_nova_notifier.py @@ -21,6 +21,9 @@ import mock import datetime +from stevedore import extension +from stevedore.tests import manager as test_manager + from nova import flags from nova import db from nova import context @@ -124,7 +127,14 @@ class TestNovaNotifier(base.TestCase): self.stubs.Set(publish, 'publish_counter', self.do_nothing) nova_notifier._initialize_config_options = False nova_notifier.initialize_manager() - nova_notifier._agent_manager.pollsters = [('test', self.Pollster())] + nova_notifier._agent_manager.ext_manager = \ + test_manager.TestExtensionManager([ + extension.Extension('test', + None, + None, + self.Pollster(), + ), + ]) def tearDown(self): self.Pollster.counters = [] diff --git a/tools/pip-requires b/tools/pip-requires index da8a6c492..38423ade6 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -8,6 +8,6 @@ sqlalchemy eventlet anyjson>=0.3.1 Flask==0.9 -stevedore>=0.4 +stevedore>=0.5 python-glanceclient python-cinderclient