Use the service.py in openstack-common

Also change the collector from a manager into a Service
to make it easier to access the rpc connection.

Change-Id: I1d79abb1db087110d565a9673a48de0502ac2a2b
This commit is contained in:
Angus Salkeld 2012-11-05 08:28:59 +11:00
parent 0a36c4c738
commit 2789f144d4
9 changed files with 77 additions and 69 deletions

View File

@ -21,19 +21,18 @@ import eventlet
eventlet.monkey_patch() eventlet.monkey_patch()
import sys import sys
from ceilometer import service as ceilo_service
from ceilometer.central import manager
from ceilometer.service import prepare_service from ceilometer.service import prepare_service
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from nova import service from ceilometer.openstack.common import service
if __name__ == '__main__': if __name__ == '__main__':
prepare_service(sys.argv) prepare_service(sys.argv)
server = \ mgr = manager.AgentManager()
service.Service.create( topic = 'ceilometer.agent.central'
binary='ceilometer-agent-central', ceilo = ceilo_service.PeriodicService(cfg.CONF.host,
topic='ceilometer.agent.central', topic, mgr)
manager='ceilometer.central.manager.AgentManager', launcher = service.launch(ceilo)
periodic_interval=cfg.CONF.periodic_interval) launcher.wait()
service.serve(server)
service.wait()

View File

@ -21,9 +21,13 @@ import eventlet
eventlet.monkey_patch() eventlet.monkey_patch()
import sys import sys
from ceilometer import service as ceilo_service
from ceilometer.compute import manager
from ceilometer.service import prepare_service from ceilometer.service import prepare_service
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from nova import service from ceilometer.openstack.common import service
from nova import flags
from nova.compute import manager as compute_manager from nova.compute import manager as compute_manager
@ -35,11 +39,9 @@ if __name__ == '__main__':
cfg.CONF.register_opts(compute_manager.compute_opts) cfg.CONF.register_opts(compute_manager.compute_opts)
prepare_service(sys.argv) prepare_service(sys.argv)
server = \ mgr = manager.AgentManager()
service.Service.create( topic = 'ceilometer.agent.compute'
binary='ceilometer-agent-compute', ceilo = ceilo_service.PeriodicService(cfg.CONF.host,
topic='ceilometer.agent.compute', topic, mgr)
manager='ceilometer.compute.manager.AgentManager', launcher = service.launch(ceilo)
periodic_interval=cfg.CONF.periodic_interval) launcher.wait()
service.serve(server)
service.wait()

View File

@ -21,17 +21,17 @@ import eventlet
eventlet.monkey_patch() eventlet.monkey_patch()
import sys import sys
from ceilometer.collector import service as coll_service
from ceilometer.service import prepare_service from ceilometer.service import prepare_service
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from nova import service from ceilometer.openstack.common import service
if __name__ == '__main__': if __name__ == '__main__':
prepare_service(sys.argv) prepare_service(sys.argv)
server = \ topic = 'ceilometer.collector'
service.Service.create(binary='ceilometer-collector', ceilo = coll_service.CollectorService(cfg.CONF.host,
topic='ceilometer.collector', topic)
manager='ceilometer.collector.' launcher = service.launch(ceilo)
'manager.CollectorManager') launcher.wait()
service.serve(server)
service.wait()

View File

@ -18,11 +18,10 @@
from stevedore import extension from stevedore import extension
from nova import manager
from ceilometer import extension_manager from ceilometer import extension_manager
from ceilometer import meter from ceilometer import meter
from ceilometer import publish from ceilometer import publish
from ceilometer import service
from ceilometer import storage from ceilometer import storage
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
@ -50,14 +49,12 @@ cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class CollectorManager(manager.Manager): class CollectorService(service.PeriodicService):
COLLECTOR_NAMESPACE = 'ceilometer.collector' COLLECTOR_NAMESPACE = 'ceilometer.collector'
def init_host(self): def start(self):
# FIXME(dhellmann): Update Manager API to get Service instance super(CollectorService, self).start()
# with existing rpc handle.
self.connection = rpc.create_connection()
storage.register_opts(cfg.CONF) storage.register_opts(cfg.CONF)
self.storage_engine = storage.get_engine(cfg.CONF) self.storage_engine = storage.get_engine(cfg.CONF)
@ -75,15 +72,13 @@ class CollectorManager(manager.Manager):
self.ext_manager.map(self._setup_subscription) self.ext_manager.map(self._setup_subscription)
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer(). # since the default for service is to use create_consumer().
self.connection.create_worker( self.conn.create_worker(
cfg.CONF.metering_topic, cfg.CONF.metering_topic,
rpc_dispatcher.RpcDispatcher([self]), rpc_dispatcher.RpcDispatcher([self]),
'ceilometer.collector.' + cfg.CONF.metering_topic, 'ceilometer.collector.' + cfg.CONF.metering_topic,
) )
self.connection.consume_in_thread()
def _setup_subscription(self, ext, *args, **kwds): def _setup_subscription(self, ext, *args, **kwds):
handler = ext.obj handler = ext.obj
LOG.debug('Event types from %s: %s', LOG.debug('Event types from %s: %s',
@ -94,7 +89,7 @@ class CollectorManager(manager.Manager):
# that notification messages do not conform to the RPC # that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method" # invocation protocol (they do not include a "method"
# parameter). # parameter).
self.connection.declare_topic_consumer( self.conn.declare_topic_consumer(
queue_name="ceilometer.notifications", queue_name="ceilometer.notifications",
topic=topic, topic=topic,
exchange_name=exchange_topic.exchange, exchange_name=exchange_topic.exchange,
@ -150,3 +145,6 @@ class CollectorManager(manager.Manager):
except Exception as err: except Exception as err:
LOG.error('Failed to record metering data: %s', err) LOG.error('Failed to record metering data: %s', err)
LOG.exception(err) LOG.exception(err)
def periodic_tasks(self, context):
pass

View File

@ -16,8 +16,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from nova import manager
from ceilometer import extension_manager from ceilometer import extension_manager
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
@ -39,13 +37,11 @@ LOG = log.getLogger(__name__)
PLUGIN_NAMESPACE = 'ceilometer.poll.compute' PLUGIN_NAMESPACE = 'ceilometer.poll.compute'
class AgentManager(manager.Manager): class AgentManager(object):
def __init__(self, host=None): def __init__(self):
super(AgentManager, self).__init__(host=host)
self.resources = resources.Resources() self.resources = resources.Resources()
def init_host(self):
self.ext_manager = extension_manager.ActivatedExtensionManager( self.ext_manager = extension_manager.ActivatedExtensionManager(
namespace=PLUGIN_NAMESPACE, namespace=PLUGIN_NAMESPACE,
disabled_names=cfg.CONF.disabled_compute_pollsters, disabled_names=cfg.CONF.disabled_compute_pollsters,

View File

@ -46,7 +46,6 @@ def initialize_manager():
cfg.CONF(args=[], project='ceilometer', prog='ceilometer-agent') cfg.CONF(args=[], project='ceilometer', prog='ceilometer-agent')
# Instantiate a manager # Instantiate a manager
_agent_manager = AgentManager() _agent_manager = AgentManager()
_agent_manager.init_host()
def notify(context, message): def notify(context, message):

View File

@ -21,8 +21,11 @@ import os
from nova import flags from nova import flags
from ceilometer.openstack.common import log
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer.openstack.common.rpc import service as rpc_service
cfg.CONF.register_opts([ cfg.CONF.register_opts([
cfg.IntOpt('periodic_interval', cfg.IntOpt('periodic_interval',
@ -49,6 +52,18 @@ CLI_OPTIONS = [
help='Auth URL to use for openstack service access'), help='Auth URL to use for openstack service access'),
] ]
cfg.CONF.register_cli_opts(CLI_OPTIONS) cfg.CONF.register_cli_opts(CLI_OPTIONS)
cfg.CONF.register_cli_opts(flags.core_opts)
cfg.CONF.register_cli_opts(flags.global_opts)
class PeriodicService(rpc_service.Service):
def start(self):
super(PeriodicService, self).start()
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
self.tg.add_timer(cfg.CONF.periodic_interval,
self.manager.periodic_tasks,
context=admin_context)
def _sanitize_cmd_line(argv): def _sanitize_cmd_line(argv):

View File

@ -26,7 +26,7 @@ from stevedore import extension
from stevedore.tests import manager as test_manager from stevedore.tests import manager as test_manager
from ceilometer import meter from ceilometer import meter
from ceilometer.collector import manager from ceilometer.collector import service
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.storage import base from ceilometer.storage import base
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
@ -83,11 +83,11 @@ TEST_NOTICE = {
} }
class TestCollectorManager(tests_base.TestCase): class TestCollectorService(tests_base.TestCase):
def setUp(self): def setUp(self):
super(TestCollectorManager, self).setUp() super(TestCollectorService, self).setUp()
self.mgr = manager.CollectorManager() self.srv = service.CollectorService('the-host', 'the-topic')
self.ctx = None self.ctx = None
#cfg.CONF.metering_secret = 'not-so-secret' #cfg.CONF.metering_secret = 'not-so-secret'
@ -97,7 +97,7 @@ class TestCollectorManager(tests_base.TestCase):
# returns. Mock it out so we can establish the manager # returns. Mock it out so we can establish the manager
# configuration. # configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.mgr.init_host() self.srv.start()
def test_valid_message(self): def test_valid_message(self):
msg = {'counter_name': 'test', msg = {'counter_name': 'test',
@ -109,11 +109,11 @@ class TestCollectorManager(tests_base.TestCase):
cfg.CONF.metering_secret, cfg.CONF.metering_secret,
) )
self.mgr.storage_conn = self.mox.CreateMock(base.Connection) self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.mgr.storage_conn.record_metering_data(msg) self.srv.storage_conn.record_metering_data(msg)
self.mox.ReplayAll() self.mox.ReplayAll()
self.mgr.record_metering_data(self.ctx, msg) self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll() self.mox.VerifyAll()
def test_invalid_message(self): def test_invalid_message(self):
@ -130,11 +130,11 @@ class TestCollectorManager(tests_base.TestCase):
def record_metering_data(self, data): def record_metering_data(self, data):
self.called = True self.called = True
self.mgr.storage_conn = ErrorConnection() self.srv.storage_conn = ErrorConnection()
self.mgr.record_metering_data(self.ctx, msg) self.srv.record_metering_data(self.ctx, msg)
assert not self.mgr.storage_conn.called, \ assert not self.srv.storage_conn.called, \
'Should not have called the storage connection' 'Should not have called the storage connection'
def test_timestamp_conversion(self): def test_timestamp_conversion(self):
@ -152,11 +152,11 @@ class TestCollectorManager(tests_base.TestCase):
expected.update(msg) expected.update(msg)
expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40) expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40)
self.mgr.storage_conn = self.mox.CreateMock(base.Connection) self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.mgr.storage_conn.record_metering_data(expected) self.srv.storage_conn.record_metering_data(expected)
self.mox.ReplayAll() self.mox.ReplayAll()
self.mgr.record_metering_data(self.ctx, msg) self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll() self.mox.VerifyAll()
def test_timestamp_tzinfo_conversion(self): def test_timestamp_tzinfo_conversion(self):
@ -174,11 +174,11 @@ class TestCollectorManager(tests_base.TestCase):
expected.update(msg) expected.update(msg)
expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000) expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000)
self.mgr.storage_conn = self.mox.CreateMock(base.Connection) self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.mgr.storage_conn.record_metering_data(expected) self.srv.storage_conn.record_metering_data(expected)
self.mox.ReplayAll() self.mox.ReplayAll()
self.mgr.record_metering_data(self.ctx, msg) self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll() self.mox.VerifyAll()
def test_process_notification(self): def test_process_notification(self):
@ -186,15 +186,15 @@ class TestCollectorManager(tests_base.TestCase):
# returns. Mock it out so we can establish the manager # returns. Mock it out so we can establish the manager
# configuration. # configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.mgr.init_host() self.srv.start()
results = [] results = []
self.stubs.Set(self.mgr, 'publish_counter', results.append) self.stubs.Set(self.srv, 'publish_counter', results.append)
self.mgr.ext_manager = test_manager.TestExtensionManager( self.srv.ext_manager = test_manager.TestExtensionManager(
[extension.Extension('test', [extension.Extension('test',
None, None,
None, None,
notifications.Instance(), notifications.Instance(),
), ),
]) ])
self.mgr.process_notification(TEST_NOTICE) self.srv.process_notification(TEST_NOTICE)
self.assert_(len(results) >= 1) self.assert_(len(results) >= 1)

View File

@ -33,7 +33,6 @@ from ceilometer.openstack.common import cfg
def test_load_plugins(): def test_load_plugins():
mgr = manager.AgentManager() mgr = manager.AgentManager()
mgr.init_host()
assert list(mgr.ext_manager), 'Failed to load any plugins' assert list(mgr.ext_manager), 'Failed to load any plugins'
return return