Merge ceilometer-collector and ceilometer-collector-udp
This will simplify deployment. Change-Id: Iabf88eafae4e95825016d382cfa86ceb8c38f880
This commit is contained in:
parent
76c6d465d8
commit
b238968c3e
@ -39,24 +39,33 @@ OPTS = [
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group="collector")
|
||||
cfg.CONF.import_opt('rpc_backend', 'ceilometer.openstack.common.rpc')
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class UDPCollectorService(service.DispatchedService, os_service.Service):
|
||||
"""UDP listener for the collector service."""
|
||||
class CollectorService(service.DispatchedService, rpc_service.Service):
|
||||
"""Listener for the collector service."""
|
||||
|
||||
def start(self):
|
||||
"""Bind the UDP socket and handle incoming data."""
|
||||
super(UDPCollectorService, self).start()
|
||||
if cfg.CONF.collector.udp_address:
|
||||
self.tg.add_thread(self.start_udp)
|
||||
if cfg.CONF.rpc_backend:
|
||||
super(CollectorService, self).start()
|
||||
if not cfg.CONF.collector.udp_address:
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
def start_udp(self):
|
||||
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp.bind((cfg.CONF.collector.udp_address,
|
||||
cfg.CONF.collector.udp_port))
|
||||
|
||||
self.running = True
|
||||
while self.running:
|
||||
self.udp_run = True
|
||||
while self.udp_run:
|
||||
# NOTE(jd) Arbitrary limit of 64K because that ought to be
|
||||
# enough for anybody.
|
||||
data, source = udp.recvfrom(64 * 1024)
|
||||
@ -78,21 +87,8 @@ class UDPCollectorService(service.DispatchedService, os_service.Service):
|
||||
LOG.exception(_("UDP: Unable to store meter"))
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
super(UDPCollectorService, self).stop()
|
||||
|
||||
|
||||
def udp_collector():
|
||||
service.prepare_service()
|
||||
os_service.launch(UDPCollectorService()).wait()
|
||||
|
||||
|
||||
class CollectorService(service.DispatchedService, rpc_service.Service):
|
||||
|
||||
def start(self):
|
||||
super(CollectorService, self).start()
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
self.udp_run = False
|
||||
super(CollectorService, self).stop()
|
||||
|
||||
def initialize_service_hook(self, service):
|
||||
'''Consumers must be declared before consume_thread start.'''
|
||||
|
@ -37,9 +37,19 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
super(TestCollector, self).setUp()
|
||||
self.CONF = self.useFixture(config.Config()).conf
|
||||
self.CONF.set_override("connection", "log://", group='database')
|
||||
self.srv = service.CollectorService('the-host', 'the-topic')
|
||||
self.counter = sample.Sample(
|
||||
name='foobar',
|
||||
type='bad',
|
||||
unit='F',
|
||||
volume=1,
|
||||
user_id='jd',
|
||||
project_id='ceilometer',
|
||||
resource_id='cat',
|
||||
timestamp='NOW!',
|
||||
resource_metadata={},
|
||||
).as_dict()
|
||||
|
||||
|
||||
class TestUDPCollectorService(TestCollector):
|
||||
def _make_fake_socket(self):
|
||||
def recvfrom(size):
|
||||
# Make the loop stop
|
||||
@ -57,21 +67,6 @@ class TestUDPCollectorService(TestCollector):
|
||||
udp_socket.bind.assert_called_once_with((conf.udp_address,
|
||||
conf.udp_port))
|
||||
|
||||
def setUp(self):
|
||||
super(TestUDPCollectorService, self).setUp()
|
||||
self.srv = service.UDPCollectorService()
|
||||
self.counter = sample.Sample(
|
||||
name='foobar',
|
||||
type='bad',
|
||||
unit='F',
|
||||
volume=1,
|
||||
user_id='jd',
|
||||
project_id='ceilometer',
|
||||
resource_id='cat',
|
||||
timestamp='NOW!',
|
||||
resource_metadata={},
|
||||
).as_dict()
|
||||
|
||||
def test_udp_receive(self):
|
||||
mock_dispatcher = mock.MagicMock()
|
||||
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
|
||||
@ -89,7 +84,7 @@ class TestUDPCollectorService(TestCollector):
|
||||
|
||||
udp_socket = self._make_fake_socket()
|
||||
with patch('socket.socket', return_value=udp_socket):
|
||||
self.srv.start()
|
||||
self.srv.start_udp()
|
||||
|
||||
self._verify_udp_socket(udp_socket)
|
||||
|
||||
@ -115,7 +110,7 @@ class TestUDPCollectorService(TestCollector):
|
||||
|
||||
udp_socket = self._make_fake_socket()
|
||||
with patch('socket.socket', return_value=udp_socket):
|
||||
self.srv.start()
|
||||
self.srv.start_udp()
|
||||
|
||||
self._verify_udp_socket(udp_socket)
|
||||
|
||||
@ -130,17 +125,10 @@ class TestUDPCollectorService(TestCollector):
|
||||
udp_socket = self._make_fake_socket()
|
||||
with patch('socket.socket', return_value=udp_socket):
|
||||
with patch('msgpack.loads', self._raise_error):
|
||||
self.srv.start()
|
||||
self.srv.start_udp()
|
||||
|
||||
self._verify_udp_socket(udp_socket)
|
||||
|
||||
|
||||
class TestCollectorService(TestCollector):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCollectorService, self).setUp()
|
||||
self.srv = service.CollectorService('the-host', 'the-topic')
|
||||
|
||||
@patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
def test_init_host(self):
|
||||
# If we try to create a real RPC connection, init_host() never
|
||||
@ -148,3 +136,16 @@ class TestCollectorService(TestCollector):
|
||||
# configuration.
|
||||
with patch('ceilometer.openstack.common.rpc.create_connection'):
|
||||
self.srv.start()
|
||||
|
||||
def test_only_udp(self):
|
||||
"""Check that only UDP is started if rpc_backend is empty."""
|
||||
self.CONF.set_override('rpc_backend', '')
|
||||
udp_socket = self._make_fake_socket()
|
||||
with patch('socket.socket', return_value=udp_socket):
|
||||
self.srv.start()
|
||||
|
||||
def test_only_rpc(self):
|
||||
"""Check that only RPC is started if udp_address is empty."""
|
||||
self.CONF.set_override('udp_address', '', group='collector')
|
||||
with patch('ceilometer.openstack.common.rpc.create_connection'):
|
||||
self.srv.start()
|
||||
|
@ -123,7 +123,6 @@ console_scripts =
|
||||
ceilometer-dbsync = ceilometer.storage:dbsync
|
||||
ceilometer-expirer = ceilometer.storage:expirer
|
||||
ceilometer-collector = ceilometer.collector.service:collector
|
||||
ceilometer-collector-udp = ceilometer.collector.service:udp_collector
|
||||
ceilometer-alarm-evaluator = ceilometer.alarm.service:alarm_evaluator
|
||||
ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user