Merge "Add encoding argument to deserialising udp packets in collector"
This commit is contained in:
commit
3d0e7552dd
@ -81,7 +81,7 @@ class CollectorService(service.DispatchedService, os_service.Service):
|
|||||||
# enough for anybody.
|
# enough for anybody.
|
||||||
data, source = udp.recvfrom(64 * units.Ki)
|
data, source = udp.recvfrom(64 * units.Ki)
|
||||||
try:
|
try:
|
||||||
sample = msgpack.loads(data)
|
sample = msgpack.loads(data, encoding='utf-8')
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
|
LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
|
||||||
else:
|
else:
|
||||||
|
@ -26,6 +26,7 @@ from stevedore import extension
|
|||||||
from ceilometer import collector
|
from ceilometer import collector
|
||||||
from ceilometer import messaging
|
from ceilometer import messaging
|
||||||
from ceilometer.openstack.common.fixture import config
|
from ceilometer.openstack.common.fixture import config
|
||||||
|
from ceilometer.publisher import utils
|
||||||
from ceilometer import sample
|
from ceilometer import sample
|
||||||
from ceilometer.tests import base as tests_base
|
from ceilometer.tests import base as tests_base
|
||||||
|
|
||||||
@ -43,6 +44,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
self.CONF = self.useFixture(config.Config()).conf
|
self.CONF = self.useFixture(config.Config()).conf
|
||||||
self.CONF.set_override("connection", "log://", group='database')
|
self.CONF.set_override("connection", "log://", group='database')
|
||||||
self.srv = collector.CollectorService()
|
self.srv = collector.CollectorService()
|
||||||
|
self.CONF.publisher.metering_secret = 'not-so-secret'
|
||||||
self.counter = sample.Sample(
|
self.counter = sample.Sample(
|
||||||
name='foobar',
|
name='foobar',
|
||||||
type='bad',
|
type='bad',
|
||||||
@ -55,6 +57,21 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
resource_metadata={},
|
resource_metadata={},
|
||||||
).as_dict()
|
).as_dict()
|
||||||
|
|
||||||
|
self.utf8_msg = utils.meter_message_from_counter(
|
||||||
|
sample.Sample(
|
||||||
|
name=u'test',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
unit=u'',
|
||||||
|
volume=1,
|
||||||
|
user_id=u'test',
|
||||||
|
project_id=u'test',
|
||||||
|
resource_id=u'test_run_tasks',
|
||||||
|
timestamp=u'NOW!',
|
||||||
|
resource_metadata={u'name': [([u'TestPublish'])]},
|
||||||
|
source=u'testsource',
|
||||||
|
),
|
||||||
|
'not-so-secret')
|
||||||
|
|
||||||
def _make_test_manager(self, plugin):
|
def _make_test_manager(self, plugin):
|
||||||
return extension.ExtensionManager.make_test_instance([
|
return extension.ExtensionManager.make_test_instance([
|
||||||
extension.Extension(
|
extension.Extension(
|
||||||
@ -65,11 +82,11 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
),
|
),
|
||||||
])
|
])
|
||||||
|
|
||||||
def _make_fake_socket(self):
|
def _make_fake_socket(self, sample):
|
||||||
def recvfrom(size):
|
def recvfrom(size):
|
||||||
# Make the loop stop
|
# Make the loop stop
|
||||||
self.srv.stop()
|
self.srv.stop()
|
||||||
return (msgpack.dumps(self.counter), ('127.0.0.1', 12345))
|
return msgpack.dumps(sample), ('127.0.0.1', 12345)
|
||||||
|
|
||||||
sock = mock.Mock()
|
sock = mock.Mock()
|
||||||
sock.recvfrom = recvfrom
|
sock.recvfrom = recvfrom
|
||||||
@ -99,7 +116,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
self.counter['counter_type'] = self.counter['type']
|
self.counter['counter_type'] = self.counter['type']
|
||||||
self.counter['counter_unit'] = self.counter['unit']
|
self.counter['counter_unit'] = self.counter['unit']
|
||||||
|
|
||||||
udp_socket = self._make_fake_socket()
|
udp_socket = self._make_fake_socket(self.counter)
|
||||||
with mock.patch('socket.socket', return_value=udp_socket):
|
with mock.patch('socket.socket', return_value=udp_socket):
|
||||||
self.srv.start_udp()
|
self.srv.start_udp()
|
||||||
|
|
||||||
@ -120,7 +137,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
self.counter['counter_type'] = self.counter['type']
|
self.counter['counter_type'] = self.counter['type']
|
||||||
self.counter['counter_unit'] = self.counter['unit']
|
self.counter['counter_unit'] = self.counter['unit']
|
||||||
|
|
||||||
udp_socket = self._make_fake_socket()
|
udp_socket = self._make_fake_socket(self.counter)
|
||||||
with mock.patch('socket.socket', return_value=udp_socket):
|
with mock.patch('socket.socket', return_value=udp_socket):
|
||||||
self.srv.start_udp()
|
self.srv.start_udp()
|
||||||
|
|
||||||
@ -135,7 +152,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
def test_udp_receive_bad_decoding(self):
|
def test_udp_receive_bad_decoding(self):
|
||||||
self.CONF.set_override('rpc_backend', '')
|
self.CONF.set_override('rpc_backend', '')
|
||||||
udp_socket = self._make_fake_socket()
|
udp_socket = self._make_fake_socket(self.counter)
|
||||||
with mock.patch('socket.socket', return_value=udp_socket):
|
with mock.patch('socket.socket', return_value=udp_socket):
|
||||||
with mock.patch('msgpack.loads', self._raise_error):
|
with mock.patch('msgpack.loads', self._raise_error):
|
||||||
self.srv.start_udp()
|
self.srv.start_udp()
|
||||||
@ -147,7 +164,7 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
def test_only_udp(self, udp_start, rpc_start):
|
def test_only_udp(self, udp_start, rpc_start):
|
||||||
"""Check that only UDP is started if rpc_backend is empty."""
|
"""Check that only UDP is started if rpc_backend is empty."""
|
||||||
self.CONF.set_override('rpc_backend', '')
|
self.CONF.set_override('rpc_backend', '')
|
||||||
udp_socket = self._make_fake_socket()
|
udp_socket = self._make_fake_socket(self.counter)
|
||||||
with mock.patch('socket.socket', return_value=udp_socket):
|
with mock.patch('socket.socket', return_value=udp_socket):
|
||||||
self.srv.start()
|
self.srv.start()
|
||||||
|
|
||||||
@ -168,3 +185,16 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
eventlet.sleep(0)
|
eventlet.sleep(0)
|
||||||
self.assertEqual(1, rpc_start.call_count)
|
self.assertEqual(1, rpc_start.call_count)
|
||||||
self.assertEqual(0, udp_start.call_count)
|
self.assertEqual(0, udp_start.call_count)
|
||||||
|
|
||||||
|
def test_udp_receive_valid_encoding(self):
|
||||||
|
self.data_sent = []
|
||||||
|
with mock.patch('socket.socket',
|
||||||
|
return_value=self._make_fake_socket(self.utf8_msg)):
|
||||||
|
self.srv.rpc_server = mock.MagicMock()
|
||||||
|
mock_dispatcher = mock.MagicMock()
|
||||||
|
self.srv.dispatcher_manager = \
|
||||||
|
self._make_test_manager(mock_dispatcher)
|
||||||
|
self.srv.start_udp()
|
||||||
|
self.assertTrue(utils.verify_signature(
|
||||||
|
mock_dispatcher.method_calls[0][1][0],
|
||||||
|
"not-so-secret"))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user