diff --git a/bin/ceilometer-agent-central b/bin/ceilometer-agent-central index c52b0d32e..68742f962 100755 --- a/bin/ceilometer-agent-central +++ b/bin/ceilometer-agent-central @@ -21,19 +21,18 @@ import eventlet eventlet.monkey_patch() import sys +from ceilometer import service as ceilo_service +from ceilometer.central import manager from ceilometer.service import prepare_service from ceilometer.openstack.common import cfg -from nova import service - +from ceilometer.openstack.common import service if __name__ == '__main__': prepare_service(sys.argv) - server = \ - service.Service.create( - binary='ceilometer-agent-central', - topic='ceilometer.agent.central', - manager='ceilometer.central.manager.AgentManager', - periodic_interval=cfg.CONF.periodic_interval) - service.serve(server) - service.wait() + mgr = manager.AgentManager() + topic = 'ceilometer.agent.central' + ceilo = ceilo_service.PeriodicService(cfg.CONF.host, + topic, mgr) + launcher = service.launch(ceilo) + launcher.wait() diff --git a/bin/ceilometer-agent-compute b/bin/ceilometer-agent-compute index 51e54dcb0..785172b77 100755 --- a/bin/ceilometer-agent-compute +++ b/bin/ceilometer-agent-compute @@ -21,9 +21,13 @@ import eventlet eventlet.monkey_patch() import sys +from ceilometer import service as ceilo_service +from ceilometer.compute import manager from ceilometer.service import prepare_service from ceilometer.openstack.common import cfg -from nova import service +from ceilometer.openstack.common import service + +from nova import flags from nova.compute import manager as compute_manager @@ -35,11 +39,9 @@ if __name__ == '__main__': cfg.CONF.register_opts(compute_manager.compute_opts) prepare_service(sys.argv) - server = \ - service.Service.create( - binary='ceilometer-agent-compute', - topic='ceilometer.agent.compute', - manager='ceilometer.compute.manager.AgentManager', - periodic_interval=cfg.CONF.periodic_interval) - service.serve(server) - service.wait() + mgr = manager.AgentManager() + topic = 'ceilometer.agent.compute' + ceilo = ceilo_service.PeriodicService(cfg.CONF.host, + topic, mgr) + launcher = service.launch(ceilo) + launcher.wait() diff --git a/bin/ceilometer-collector b/bin/ceilometer-collector index 4eb542cb7..386fecee5 100755 --- a/bin/ceilometer-collector +++ b/bin/ceilometer-collector @@ -21,17 +21,17 @@ import eventlet eventlet.monkey_patch() import sys + +from ceilometer.collector import service as coll_service from ceilometer.service import prepare_service from ceilometer.openstack.common import cfg -from nova import service +from ceilometer.openstack.common import service if __name__ == '__main__': prepare_service(sys.argv) - server = \ - service.Service.create(binary='ceilometer-collector', - topic='ceilometer.collector', - manager='ceilometer.collector.' - 'manager.CollectorManager') - service.serve(server) - service.wait() + topic = 'ceilometer.collector' + ceilo = coll_service.CollectorService(cfg.CONF.host, + topic) + launcher = service.launch(ceilo) + launcher.wait() diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/service.py similarity index 92% rename from ceilometer/collector/manager.py rename to ceilometer/collector/service.py index 3aa581268..abe011d50 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/service.py @@ -18,11 +18,10 @@ from stevedore import extension -from nova import manager - from ceilometer import extension_manager from ceilometer import meter from ceilometer import publish +from ceilometer import service from ceilometer import storage from ceilometer.openstack.common import context from ceilometer.openstack.common import cfg @@ -50,14 +49,12 @@ cfg.CONF.register_opts(OPTS) LOG = log.getLogger(__name__) -class CollectorManager(manager.Manager): +class CollectorService(service.PeriodicService): COLLECTOR_NAMESPACE = 'ceilometer.collector' - def init_host(self): - # FIXME(dhellmann): Update Manager API to get Service instance - # with existing rpc handle. - self.connection = rpc.create_connection() + def start(self): + super(CollectorService, self).start() storage.register_opts(cfg.CONF) self.storage_engine = storage.get_engine(cfg.CONF) @@ -75,15 +72,13 @@ class CollectorManager(manager.Manager): self.ext_manager.map(self._setup_subscription) # Set ourselves up as a separate worker for the metering data, - # since the default for manager is to use create_consumer(). - self.connection.create_worker( + # since the default for service is to use create_consumer(). + self.conn.create_worker( cfg.CONF.metering_topic, rpc_dispatcher.RpcDispatcher([self]), 'ceilometer.collector.' + cfg.CONF.metering_topic, ) - self.connection.consume_in_thread() - def _setup_subscription(self, ext, *args, **kwds): handler = ext.obj LOG.debug('Event types from %s: %s', @@ -94,7 +89,7 @@ class CollectorManager(manager.Manager): # that notification messages do not conform to the RPC # invocation protocol (they do not include a "method" # parameter). - self.connection.declare_topic_consumer( + self.conn.declare_topic_consumer( queue_name="ceilometer.notifications", topic=topic, exchange_name=exchange_topic.exchange, @@ -150,3 +145,6 @@ class CollectorManager(manager.Manager): except Exception as err: LOG.error('Failed to record metering data: %s', err) LOG.exception(err) + + def periodic_tasks(self, context): + pass diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index 8d245da50..0ecd138de 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -16,8 +16,6 @@ # License for the specific language governing permissions and limitations # under the License. -from nova import manager - from ceilometer import extension_manager from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log @@ -39,13 +37,11 @@ LOG = log.getLogger(__name__) PLUGIN_NAMESPACE = 'ceilometer.poll.compute' -class AgentManager(manager.Manager): +class AgentManager(object): - def __init__(self, host=None): - super(AgentManager, self).__init__(host=host) + def __init__(self): self.resources = resources.Resources() - def init_host(self): self.ext_manager = extension_manager.ActivatedExtensionManager( namespace=PLUGIN_NAMESPACE, disabled_names=cfg.CONF.disabled_compute_pollsters, diff --git a/ceilometer/compute/nova_notifier.py b/ceilometer/compute/nova_notifier.py index 1e6508da4..543c500c6 100644 --- a/ceilometer/compute/nova_notifier.py +++ b/ceilometer/compute/nova_notifier.py @@ -46,7 +46,6 @@ def initialize_manager(): cfg.CONF(args=[], project='ceilometer', prog='ceilometer-agent') # Instantiate a manager _agent_manager = AgentManager() - _agent_manager.init_host() def notify(context, message): diff --git a/ceilometer/service.py b/ceilometer/service.py index cf115ff77..02a3743b0 100644 --- a/ceilometer/service.py +++ b/ceilometer/service.py @@ -21,8 +21,11 @@ import os from nova import flags -from ceilometer.openstack.common import log from ceilometer.openstack.common import cfg +from ceilometer.openstack.common import context +from ceilometer.openstack.common import log +from ceilometer.openstack.common.rpc import service as rpc_service + cfg.CONF.register_opts([ cfg.IntOpt('periodic_interval', @@ -49,6 +52,18 @@ CLI_OPTIONS = [ help='Auth URL to use for openstack service access'), ] cfg.CONF.register_cli_opts(CLI_OPTIONS) +cfg.CONF.register_cli_opts(flags.core_opts) +cfg.CONF.register_cli_opts(flags.global_opts) + + +class PeriodicService(rpc_service.Service): + + def start(self): + super(PeriodicService, self).start() + admin_context = context.RequestContext('admin', 'admin', is_admin=True) + self.tg.add_timer(cfg.CONF.periodic_interval, + self.manager.periodic_tasks, + context=admin_context) def _sanitize_cmd_line(argv): diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 6ec54cb8b..9e88669f8 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -26,7 +26,7 @@ from stevedore import extension from stevedore.tests import manager as test_manager from ceilometer import meter -from ceilometer.collector import manager +from ceilometer.collector import service from ceilometer.openstack.common import cfg from ceilometer.storage import base from ceilometer.tests import base as tests_base @@ -83,11 +83,11 @@ TEST_NOTICE = { } -class TestCollectorManager(tests_base.TestCase): +class TestCollectorService(tests_base.TestCase): def setUp(self): - super(TestCollectorManager, self).setUp() - self.mgr = manager.CollectorManager() + super(TestCollectorService, self).setUp() + self.srv = service.CollectorService('the-host', 'the-topic') self.ctx = None #cfg.CONF.metering_secret = 'not-so-secret' @@ -97,7 +97,7 @@ class TestCollectorManager(tests_base.TestCase): # returns. Mock it out so we can establish the manager # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): - self.mgr.init_host() + self.srv.start() def test_valid_message(self): msg = {'counter_name': 'test', @@ -109,11 +109,11 @@ class TestCollectorManager(tests_base.TestCase): cfg.CONF.metering_secret, ) - self.mgr.storage_conn = self.mox.CreateMock(base.Connection) - self.mgr.storage_conn.record_metering_data(msg) + self.srv.storage_conn = self.mox.CreateMock(base.Connection) + self.srv.storage_conn.record_metering_data(msg) self.mox.ReplayAll() - self.mgr.record_metering_data(self.ctx, msg) + self.srv.record_metering_data(self.ctx, msg) self.mox.VerifyAll() def test_invalid_message(self): @@ -130,11 +130,11 @@ class TestCollectorManager(tests_base.TestCase): def record_metering_data(self, data): self.called = True - self.mgr.storage_conn = ErrorConnection() + self.srv.storage_conn = ErrorConnection() - self.mgr.record_metering_data(self.ctx, msg) + self.srv.record_metering_data(self.ctx, msg) - assert not self.mgr.storage_conn.called, \ + assert not self.srv.storage_conn.called, \ 'Should not have called the storage connection' def test_timestamp_conversion(self): @@ -152,11 +152,11 @@ class TestCollectorManager(tests_base.TestCase): expected.update(msg) expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40) - self.mgr.storage_conn = self.mox.CreateMock(base.Connection) - self.mgr.storage_conn.record_metering_data(expected) + self.srv.storage_conn = self.mox.CreateMock(base.Connection) + self.srv.storage_conn.record_metering_data(expected) self.mox.ReplayAll() - self.mgr.record_metering_data(self.ctx, msg) + self.srv.record_metering_data(self.ctx, msg) self.mox.VerifyAll() def test_timestamp_tzinfo_conversion(self): @@ -174,11 +174,11 @@ class TestCollectorManager(tests_base.TestCase): expected.update(msg) expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000) - self.mgr.storage_conn = self.mox.CreateMock(base.Connection) - self.mgr.storage_conn.record_metering_data(expected) + self.srv.storage_conn = self.mox.CreateMock(base.Connection) + self.srv.storage_conn.record_metering_data(expected) self.mox.ReplayAll() - self.mgr.record_metering_data(self.ctx, msg) + self.srv.record_metering_data(self.ctx, msg) self.mox.VerifyAll() def test_process_notification(self): @@ -186,15 +186,15 @@ class TestCollectorManager(tests_base.TestCase): # returns. Mock it out so we can establish the manager # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): - self.mgr.init_host() + self.srv.start() results = [] - self.stubs.Set(self.mgr, 'publish_counter', results.append) - self.mgr.ext_manager = test_manager.TestExtensionManager( + self.stubs.Set(self.srv, 'publish_counter', results.append) + self.srv.ext_manager = test_manager.TestExtensionManager( [extension.Extension('test', None, None, notifications.Instance(), ), ]) - self.mgr.process_notification(TEST_NOTICE) + self.srv.process_notification(TEST_NOTICE) self.assert_(len(results) >= 1) diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 3bf834416..e94082cdc 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -33,7 +33,6 @@ from ceilometer.openstack.common import cfg def test_load_plugins(): mgr = manager.AgentManager() - mgr.init_host() assert list(mgr.ext_manager), 'Failed to load any plugins' return