Vladislav Kuzmin 517a0eeaa5 Replace tests.base part2
It is the next step to replace using ceilometer.tests.base with
openstack.common.test.
In this patch is changed for using
openstack.common.test and added needed fixtures in test classes.

Change-Id: I091fbf1419e78bf69491f5787deb93306d933c27
2013-10-18 14:40:23 +04:00

107 lines
3.8 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/collector/dispatcher/file.py
"""
import os
import tempfile
import logging.handlers
from ceilometer.collector.dispatcher import file
from ceilometer.openstack.common import test
from ceilometer.openstack.common.fixture import config
from ceilometer.publisher import rpc
class TestDispatcherFile(test.BaseTestCase):
def setUp(self):
super(TestDispatcherFile, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
def test_file_dispatcher_with_all_config(self):
# Create a temporaryFile to get a file name
tf = tempfile.NamedTemporaryFile('r')
filename = tf.name
tf.close()
self.CONF.dispatcher_file.file_path = filename
self.CONF.dispatcher_file.max_bytes = 50
self.CONF.dispatcher_file.backup_count = 5
dispatcher = file.FileDispatcher(self.CONF)
# The number of the handlers should be 1
self.assertEqual(1, len(dispatcher.log.handlers))
# The handler should be RotatingFileHandler
handler = dispatcher.log.handlers[0]
self.assertTrue(isinstance(handler,
logging.handlers.RotatingFileHandler))
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
self.CONF.publisher_rpc.metering_secret,
)
# The record_metering_data method should exist and not produce errors.
dispatcher.record_metering_data(None, msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
def test_file_dispatcher_with_path_only(self):
# Create a temporaryFile to get a file name
tf = tempfile.NamedTemporaryFile('r')
filename = tf.name
tf.close()
self.CONF.dispatcher_file.file_path = filename
self.CONF.dispatcher_file.max_bytes = None
self.CONF.dispatcher_file.backup_count = None
dispatcher = file.FileDispatcher(self.CONF)
# The number of the handlers should be 1
self.assertEqual(1, len(dispatcher.log.handlers))
# The handler should be RotatingFileHandler
handler = dispatcher.log.handlers[0]
self.assertTrue(isinstance(handler,
logging.FileHandler))
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
}
msg['message_signature'] = rpc.compute_signature(
msg,
self.CONF.publisher_rpc.metering_secret,
)
# The record_metering_data method should exist and not produce errors.
dispatcher.record_metering_data(None, msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
def test_file_dispatcher_with_no_path(self):
self.CONF.dispatcher_file.file_path = None
dispatcher = file.FileDispatcher(self.CONF)
# The log should be None
self.assertIsNone(dispatcher.log)