# -*- coding: utf-8 -*- # # Author: François Rossigneux # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import mock from ceilometer.energy import kwapi from ceilometer.central import manager from ceilometer.openstack.common import context from ceilometer.openstack.common import test from ceilometer.openstack.common.fixture import moxstubout from keystoneclient import exceptions PROBE_DICT = { "probes": { "A": { "timestamp": 1357730232.68754, "w": 107.3, "kwh": 0.001058255421506034 }, "B": { "timestamp": 1357730232.048158, "w": 15.0, "kwh": 0.029019045026169896 }, "C": { "timestamp": 1357730232.223375, "w": 95.0, "kwh": 0.17361822634312918 } } } class TestManager(manager.AgentManager): def __init__(self): super(TestManager, self).__init__() self.keystone = None class TestKwapi(test.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestKwapi, self).setUp() self.stubs = self.useFixture(moxstubout.MoxStubout()).stubs self.context = context.get_admin_context() self.manager = TestManager() @staticmethod def fake_get_kwapi_client(self, ksclient): raise exceptions.EndpointNotFound("fake keystone exception") def test_endpoint_not_exist(self): self.stubs.Set(kwapi._Base, 'get_kwapi_client', self.fake_get_kwapi_client) samples = list(kwapi.EnergyPollster().get_samples(self.manager, {})) self.assertEqual(len(samples), 0) class TestEnergyPollster(test.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestEnergyPollster, self).setUp() self.stubs = self.useFixture(moxstubout.MoxStubout()).stubs self.context = context.get_admin_context() self.manager = TestManager() self.stubs.Set(kwapi._Base, '_iter_probes', self.fake_iter_probes) @staticmethod def fake_iter_probes(self, ksclient, cache): probes = PROBE_DICT['probes'] for key, value in probes.iteritems(): probe_dict = value probe_dict['id'] = key yield probe_dict def test_sample(self): cache = {} samples = list(kwapi.EnergyPollster().get_samples( self.manager, cache, )) self.assertEqual(len(samples), 3) samples_by_name = dict((s.resource_id, s) for s in samples) for name, probe in PROBE_DICT['probes'].items(): sample = samples_by_name[name] expected = datetime.datetime.fromtimestamp( probe['timestamp'] ).isoformat() self.assertEqual(sample.timestamp, expected) self.assertEqual(sample.volume, probe['kwh']) # self.assert_( # any(map(lambda sample: sample.volume == probe['w'], # power_samples))) class TestEnergyPollsterCache(test.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestEnergyPollsterCache, self).setUp() self.context = context.get_admin_context() self.manager = TestManager() def test_get_samples_cached(self): probe = {'id': 'A'} probe.update(PROBE_DICT['probes']['A']) cache = { kwapi.EnergyPollster.CACHE_KEY_PROBE: [probe], } self.manager.keystone = mock.Mock() pollster = kwapi.EnergyPollster() with mock.patch.object(pollster, '_get_probes') as do_not_call: do_not_call.side_effect = AssertionError('should not be called') samples = list(pollster.get_samples(self.manager, cache)) self.assertEqual(len(samples), 1) class TestPowerPollster(test.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestPowerPollster, self).setUp() self.stubs = self.useFixture(moxstubout.MoxStubout()).stubs self.context = context.get_admin_context() self.manager = TestManager() self.stubs.Set(kwapi._Base, '_iter_probes', self.fake_iter_probes) @staticmethod def fake_iter_probes(self, ksclient, cache): probes = PROBE_DICT['probes'] for key, value in probes.iteritems(): probe_dict = value probe_dict['id'] = key yield probe_dict def test_sample(self): cache = {} samples = list(kwapi.PowerPollster().get_samples( self.manager, cache, )) self.assertEqual(len(samples), 3) samples_by_name = dict((s.resource_id, s) for s in samples) for name, probe in PROBE_DICT['probes'].items(): sample = samples_by_name[name] expected = datetime.datetime.fromtimestamp( probe['timestamp'] ).isoformat() self.assertEqual(sample.timestamp, expected) self.assertEqual(sample.volume, probe['w']) class TestPowerPollsterCache(test.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestPowerPollsterCache, self).setUp() self.context = context.get_admin_context() self.manager = TestManager() def test_get_samples_cached(self): probe = {'id': 'A'} probe.update(PROBE_DICT['probes']['A']) cache = { kwapi.PowerPollster.CACHE_KEY_PROBE: [probe], } self.manager.keystone = mock.Mock() pollster = kwapi.PowerPollster() with mock.patch.object(pollster, '_get_probes') as do_not_call: do_not_call.side_effect = AssertionError('should not be called') samples = list(pollster.get_samples(self.manager, cache)) self.assertEqual(len(samples), 1)