aodh/tests/test_notifier.py
Angus Salkeld e9a30f61fe Change from unittest to testtools
part of bug 1177924

Change-Id: I99be2c48dc7143015883b89130083c8ea11abeec
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
2013-05-17 09:52:34 +10:00

114 lines
4.0 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/notifier.py
"""
from ceilometer import notifier
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import transformer
from ceilometer.tests import base as tests_base
from stevedore import extension
MESSAGE = {
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
class TestNotifier(tests_base.TestCase):
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counters(self, ctxt, counter, source):
self.counters.extend(counter)
def test_process_notification(self):
pub = self.PublisherClass()
transformer_manager = transformer.TransformerExtensionManager(
'ceilometer.transformer',
)
publisher_manager = publisher.PublisherExtensionManager(
'fake',
)
publisher_manager.extensions = [
extension.Extension(
'test_pub',
None,
None,
pub,
), ]
publisher_manager.by_name = dict(
(e.name, e)
for e
in publisher_manager.extensions)
notifier._pipeline_manager = pipeline.PipelineManager(
[{
'name': "test_pipeline",
'interval': 60,
'counters': ['*'],
'transformers': [],
'publishers': ["test_pub"],
}],
transformer_manager,
publisher_manager)
self.assertEqual(len(pub.counters), 0)
notifier.notify(None, MESSAGE)
self.assertTrue(len(pub.counters) > 0)
self.assertIn('disk.ephemeral.size',
[c.name for c in pub.counters])