Fix order of arguments in assertEquals

Part 5

Related to: bug #1277104

Change-Id: I44fda94aeea972cf43482bf3c39537904477938b
This commit is contained in:
Ilya Tyaptin 2014-02-06 20:52:22 +04:00 committed by Lianhao Lu
parent 26c6ce8c1f
commit 867cecf93b
13 changed files with 309 additions and 319 deletions

View File

@ -233,7 +233,7 @@ class BasePipelineTestCase(test.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(pipe.get_interval(), 5)
self.assertEqual(5, pipe.get_interval())
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
@ -243,11 +243,11 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], "name"))
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
@ -259,7 +259,7 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(1, len(publisher.samples))
self.test_counter = sample.Sample(
name='b',
@ -276,10 +276,10 @@ class BasePipelineTestCase(test.BaseTestCase):
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(getattr(publisher.samples[1], "name"), 'b_update')
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2, len(self.TransformerClass.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
def test_counter_dont_match(self):
counter_cfg = ['nomatch']
@ -290,8 +290,8 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(publisher.calls, 0)
self.assertEqual(0, len(publisher.samples))
self.assertEqual(0, publisher.calls)
def test_wildcard_counter(self):
counter_cfg = ['*']
@ -302,9 +302,9 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
@ -321,10 +321,9 @@ class BasePipelineTestCase(test.BaseTestCase):
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"),
'a_update')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
@ -335,11 +334,11 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], "name"))
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
@ -405,18 +404,18 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, publisher.calls)
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
new_publisher = pipeline_manager.pipelines[1].publishers[0]
self.assertEqual(len(new_publisher.samples), 1)
self.assertEqual(new_publisher.calls, 1)
self.assertEqual(getattr(new_publisher.samples[0], "name"), 'b_new')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], "name"),
'b')
self.assertEqual(1, len(new_publisher.samples))
self.assertEqual(1, new_publisher.calls)
self.assertEqual('b_new', getattr(new_publisher.samples[0], "name"))
self.assertEqual(2, len(self.TransformerClass.samples))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], "name"))
self.assertEqual('b',
getattr(self.TransformerClass.samples[1], "name"))
def test_multiple_pipeline_exception(self):
self._break_pipeline_cfg()
@ -442,14 +441,14 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(publisher.calls, 1)
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], "name"),
'b')
self.assertEqual(1, publisher.calls)
self.assertEqual(1, len(publisher.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual(2, len(self.TransformerClass.samples))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], "name"))
self.assertEqual('b',
getattr(self.TransformerClass.samples[1], "name"))
def test_none_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', None)
@ -458,9 +457,9 @@ class BasePipelineTestCase(test.BaseTestCase):
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.samples[0], 'name'), 'a')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, publisher.calls)
self.assertEqual('a', getattr(publisher.samples[0], 'name'))
def test_empty_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', [])
@ -469,9 +468,9 @@ class BasePipelineTestCase(test.BaseTestCase):
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.samples[0], 'name'), 'a')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, publisher.calls)
self.assertEqual('a', getattr(publisher.samples[0], 'name'))
def test_multiple_transformer_same_class(self):
transformer_cfg = [
@ -492,15 +491,15 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(publisher.calls, 1)
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_update')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'),
'a_update')
self.assertEqual(1, publisher.calls)
self.assertEqual(1, len(publisher.samples))
self.assertEqual('a_update_update',
getattr(publisher.samples[0], 'name'))
self.assertEqual(2, len(self.TransformerClass.samples))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], 'name'))
self.assertEqual('a_update',
getattr(self.TransformerClass.samples[1], 'name'))
def test_multiple_transformer_same_class_different_parameter(self):
transformer_cfg = [
@ -525,15 +524,16 @@ class BasePipelineTestCase(test.BaseTestCase):
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'),
'a_update')
self.assertEqual(2, len(self.TransformerClass.samples))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], 'name'))
self.assertEqual('a_update',
getattr(self.TransformerClass.samples[1], 'name'))
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_new')
self.assertEqual(1,
len(publisher.samples))
self.assertEqual('a_update_new',
getattr(publisher.samples[0], 'name'))
def test_multiple_transformer_drop_transformer(self):
transformer_cfg = [
@ -563,13 +563,14 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(len(self.TransformerClassDrop.samples), 1)
self.assertEqual(getattr(self.TransformerClassDrop.samples[0], 'name'),
'a_update')
self.assertEqual(0, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a',
getattr(self.TransformerClass.samples[0], 'name'))
self.assertEqual(1,
len(self.TransformerClassDrop.samples))
self.assertEqual('a_update',
getattr(self.TransformerClassDrop.samples[0], 'name'))
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
@ -581,12 +582,12 @@ class BasePipelineTestCase(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(new_publisher.samples), 1)
self.assertEqual(getattr(new_publisher.samples[0], 'name'),
'a_update')
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(new_publisher.samples))
self.assertEqual('a_update',
getattr(new_publisher.samples[0], 'name'))
self.assertEqual('a_update',
getattr(publisher.samples[0], 'name'))
def test_multiple_publisher_isolation(self):
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
@ -596,9 +597,9 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(len(new_publisher.samples), 1)
self.assertEqual(getattr(new_publisher.samples[0], 'name'),
'a_update')
self.assertEqual(1, len(new_publisher.samples))
self.assertEqual('a_update',
getattr(new_publisher.samples[0], 'name'))
def test_multiple_counter_pipeline(self):
self._set_pipeline_cfg('counters', ['a', 'b'])
@ -619,9 +620,9 @@ class BasePipelineTestCase(test.BaseTestCase):
)])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(getattr(publisher.samples[0], 'name'), 'a_update')
self.assertEqual(getattr(publisher.samples[1], 'name'), 'b_update')
self.assertEqual(2, len(publisher.samples))
self.assertEqual('a_update', getattr(publisher.samples[0], 'name'))
self.assertEqual('b_update', getattr(publisher.samples[1], 'name'))
def test_flush_pipeline_cache(self):
CACHE_SIZE = 10
@ -647,18 +648,17 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_sample(None, self.test_counter)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
pipe.publish_sample(None, self.test_counter)
pipe.flush(None)
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
for i in range(CACHE_SIZE - 2):
pipe.publish_sample(None, self.test_counter)
pipe.flush(None)
self.assertEqual(len(publisher.samples), CACHE_SIZE)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_new')
self.assertEqual(CACHE_SIZE, len(publisher.samples))
self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
def test_flush_pipeline_cache_multiple_counter(self):
CACHE_SIZE = 3
@ -696,16 +696,16 @@ class BasePipelineTestCase(test.BaseTestCase):
)])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertEqual(len(publisher.samples), CACHE_SIZE)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_new')
self.assertEqual(getattr(publisher.samples[1], 'name'),
'b_update_new')
self.assertEqual(CACHE_SIZE, len(publisher.samples))
self.assertEqual('a_update_new',
getattr(publisher.samples[0], 'name'))
self.assertEqual('b_update_new',
getattr(publisher.samples[1], 'name'))
def test_flush_pipeline_cache_before_publisher(self):
extra_transformer_cfg = [{
@ -719,11 +719,11 @@ class BasePipelineTestCase(test.BaseTestCase):
publisher = pipe.publishers[0]
pipe.publish_sample(None, self.test_counter)
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update')
self.assertEqual(1, len(publisher.samples))
self.assertEqual('a_update',
getattr(publisher.samples[0], 'name'))
def test_variable_counter(self):
self.pipeline_cfg = [{
@ -755,12 +755,12 @@ class BasePipelineTestCase(test.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"),
'a:b_update')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a:b')
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(self.TransformerClass.samples))
self.assertEqual('a:b_update',
getattr(publisher.samples[0], "name"))
self.assertEqual('a:b',
getattr(self.TransformerClass.samples[0], "name"))
def test_global_unit_conversion(self):
scale = 'volume / ((10**6) * 60)'
@ -797,14 +797,14 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(1, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(1, len(publisher.samples))
cpu_mins = publisher.samples[-1]
self.assertEqual(getattr(cpu_mins, 'name'), 'cpu_mins')
self.assertEqual(getattr(cpu_mins, 'unit'), 'min')
self.assertEqual(getattr(cpu_mins, 'type'), sample.TYPE_CUMULATIVE)
self.assertEqual(getattr(cpu_mins, 'volume'), 20)
self.assertEqual('cpu_mins', getattr(cpu_mins, 'name'))
self.assertEqual('min', getattr(cpu_mins, 'unit'))
self.assertEqual(sample.TYPE_CUMULATIVE, getattr(cpu_mins, 'type'))
self.assertEqual(20, getattr(cpu_mins, 'volume'))
def test_unit_identified_source_unit_conversion(self):
transformer_cfg = [
@ -851,16 +851,16 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(2, len(publisher.samples))
core_temp = publisher.samples[1]
self.assertEqual(getattr(core_temp, 'name'), 'core_temperature')
self.assertEqual(getattr(core_temp, 'unit'), '°F')
self.assertEqual(getattr(core_temp, 'volume'), 96.8)
self.assertEqual('core_temperature', getattr(core_temp, 'name'))
self.assertEqual('°F', getattr(core_temp, 'unit'))
self.assertEqual(96.8, getattr(core_temp, 'volume'))
amb_temp = publisher.samples[0]
self.assertEqual(getattr(amb_temp, 'name'), 'ambient_temperature')
self.assertEqual(getattr(amb_temp, 'unit'), '°F')
self.assertEqual(getattr(amb_temp, 'volume'), 88.8)
self.assertEqual(getattr(core_temp, 'volume'), 96.8)
self.assertEqual('ambient_temperature', getattr(amb_temp, 'name'))
self.assertEqual('°F', getattr(amb_temp, 'unit'))
self.assertEqual(88.8, getattr(amb_temp, 'volume'))
self.assertEqual(96.8, getattr(core_temp, 'volume'))
def _do_test_rate_of_change_conversion(self, prev, curr, type, expected,
offset=1, weight=None):
@ -941,21 +941,21 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(2, len(publisher.samples))
cpu_util = publisher.samples[0]
self.assertEqual(getattr(cpu_util, 'name'), 'cpu_util')
self.assertEqual(getattr(cpu_util, 'resource_id'), 'test_resource')
self.assertEqual(getattr(cpu_util, 'unit'), '%')
self.assertEqual(getattr(cpu_util, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(cpu_util, 'volume'), expected)
self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
self.assertEqual('test_resource', getattr(cpu_util, 'resource_id'))
self.assertEqual('%', getattr(cpu_util, 'unit'))
self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type'))
self.assertEqual(expected, getattr(cpu_util, 'volume'))
cpu_util = publisher.samples[1]
self.assertEqual(getattr(cpu_util, 'name'), 'cpu_util')
self.assertEqual(getattr(cpu_util, 'resource_id'), 'test_resource2')
self.assertEqual(getattr(cpu_util, 'unit'), '%')
self.assertEqual(getattr(cpu_util, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(cpu_util, 'volume'), expected * 2)
self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
self.assertEqual('test_resource2', getattr(cpu_util, 'resource_id'))
self.assertEqual('%', getattr(cpu_util, 'unit'))
self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type'))
self.assertEqual(expected * 2, getattr(cpu_util, 'volume'))
def test_rate_of_change_conversion(self):
self._do_test_rate_of_change_conversion(120000000000,
@ -1026,23 +1026,22 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 0)
self.assertEqual(0, len(publisher.samples))
def test_resources(self):
resources = ['test1://', 'test2://']
self._set_pipeline_cfg('resources', resources)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(pipeline_manager.pipelines[0].resources,
resources)
self.assertEqual(resources,
pipeline_manager.pipelines[0].resources)
def test_no_resources(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(len(pipeline_manager.pipelines[0].resources),
0)
self.assertEqual(0, len(pipeline_manager.pipelines[0].resources))
def _do_test_rate_of_change_mapping(self, pipe, meters, units):
now = timeutils.utcnow()
@ -1070,21 +1069,21 @@ class BasePipelineTestCase(test.BaseTestCase):
pipe.publish_samples(None, counters)
publisher = pipe.publishers[0]
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(2, len(publisher.samples))
bps = publisher.samples[0]
self.assertEqual(getattr(bps, 'name'), '%s.rate' % meters[0])
self.assertEqual(getattr(bps, 'resource_id'), 'resource1')
self.assertEqual(getattr(bps, 'unit'), '%s/s' % units[0])
self.assertEqual(getattr(bps, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(bps, 'volume'), rate)
self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name'))
self.assertEqual('resource1', getattr(bps, 'resource_id'))
self.assertEqual('%s/s' % units[0], getattr(bps, 'unit'))
self.assertEqual(sample.TYPE_GAUGE, getattr(bps, 'type'))
self.assertEqual(rate, getattr(bps, 'volume'))
rps = publisher.samples[1]
self.assertEqual(getattr(rps, 'name'), '%s.rate' % meters[1])
self.assertEqual(getattr(rps, 'resource_id'), 'resource2')
self.assertEqual(getattr(rps, 'unit'), '%s/s' % units[1])
self.assertEqual(getattr(rps, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(rps, 'volume'), rate)
self.assertEqual('%s.rate' % meters[1], getattr(rps, 'name'))
self.assertEqual('resource2', getattr(rps, 'resource_id'))
self.assertEqual('%s/s' % units[1], getattr(rps, 'unit'))
self.assertEqual(sample.TYPE_GAUGE, getattr(rps, 'type'))
self.assertEqual(rate, getattr(rps, 'volume'))
def test_rate_of_change_mapping(self):
map_from = {'name': 'disk\\.(read|write)\\.(bytes|requests)',

View File

@ -49,24 +49,24 @@ class PreciseTimestampTest(test.BaseTestCase):
def test_load_dialect_impl_mysql(self):
result = self._type.load_dialect_impl(self._mysql_dialect)
self.assertEqual(type(result), NUMERIC)
self.assertEqual(result.precision, 20)
self.assertEqual(result.scale, 6)
self.assertEqual(NUMERIC, type(result))
self.assertEqual(20, result.precision)
self.assertEqual(6, result.scale)
self.assertTrue(result.asdecimal)
def test_load_dialect_impl_postgres(self):
result = self._type.load_dialect_impl(self._postgres_dialect)
self.assertEqual(type(result), sqlalchemy.DateTime)
self.assertEqual(sqlalchemy.DateTime, type(result))
def test_process_bind_param_store_decimal_mysql(self):
expected = utils.dt_to_decimal(self._date)
result = self._type.process_bind_param(self._date, self._mysql_dialect)
self.assertEqual(result, expected)
self.assertEqual(expected, result)
def test_process_bind_param_store_datetime_postgres(self):
result = self._type.process_bind_param(self._date,
self._postgres_dialect)
self.assertEqual(result, self._date)
self.assertEqual(self._date, result)
def test_process_bind_param_store_none_mysql(self):
result = self._type.process_bind_param(None, self._mysql_dialect)
@ -81,12 +81,12 @@ class PreciseTimestampTest(test.BaseTestCase):
dec_value = utils.dt_to_decimal(self._date)
result = self._type.process_result_value(dec_value,
self._mysql_dialect)
self.assertEqual(result, self._date)
self.assertEqual(self._date, result)
def test_process_result_value_datetime_postgres(self):
result = self._type.process_result_value(self._date,
self._postgres_dialect)
self.assertEqual(result, self._date)
self.assertEqual(self._date, result)
def test_process_result_value_none_mysql(self):
result = self._type.process_result_value(None,

View File

@ -29,34 +29,32 @@ class BaseTest(test.BaseTestCase):
datetime.datetime(2013, 1, 1, 12, 0),
datetime.datetime(2013, 1, 1, 13, 0),
60))
self.assertEqual(len(times), 60)
self.assertEqual(times[10],
(datetime.datetime(2013, 1, 1, 12, 10),
datetime.datetime(2013, 1, 1, 12, 11)))
self.assertEqual(times[21],
(datetime.datetime(2013, 1, 1, 12, 21),
datetime.datetime(2013, 1, 1, 12, 22)))
self.assertEqual(60, len(times))
self.assertEqual((datetime.datetime(2013, 1, 1, 12, 10),
datetime.datetime(2013, 1, 1, 12, 11)), times[10])
self.assertEqual((datetime.datetime(2013, 1, 1, 12, 21),
datetime.datetime(2013, 1, 1, 12, 22)), times[21])
def test_iter_period_bis(self):
times = list(base.iter_period(
datetime.datetime(2013, 1, 2, 13, 0),
datetime.datetime(2013, 1, 2, 14, 0),
55))
self.assertEqual(len(times), math.ceil(3600 / 55.0))
self.assertEqual(times[10],
(datetime.datetime(2013, 1, 2, 13, 9, 10),
datetime.datetime(2013, 1, 2, 13, 10, 5)))
self.assertEqual(times[21],
(datetime.datetime(2013, 1, 2, 13, 19, 15),
datetime.datetime(2013, 1, 2, 13, 20, 10)))
self.assertEqual(math.ceil(3600 / 55.0), len(times))
self.assertEqual((datetime.datetime(2013, 1, 2, 13, 9, 10),
datetime.datetime(2013, 1, 2, 13, 10, 5)),
times[10])
self.assertEqual((datetime.datetime(2013, 1, 2, 13, 19, 15),
datetime.datetime(2013, 1, 2, 13, 20, 10)),
times[21])
def test_handle_sort_key(self):
sort_keys_alarm = base._handle_sort_key('alarm')
self.assertEqual(sort_keys_alarm, ['name', 'user_id', 'project_id'])
self.assertEqual(['name', 'user_id', 'project_id'], sort_keys_alarm)
sort_keys_meter = base._handle_sort_key('meter', 'foo')
self.assertEqual(sort_keys_meter, ['foo', 'user_id', 'project_id'])
self.assertEqual(['foo', 'user_id', 'project_id'], sort_keys_meter)
sort_keys_resource = base._handle_sort_key('resource', 'project_id')
self.assertEqual(sort_keys_resource,
['project_id', 'user_id', 'timestamp'])
self.assertEqual(['project_id', 'user_id', 'timestamp'],
sort_keys_resource)

View File

@ -55,7 +55,7 @@ class MongoDBConnection(MongoDBEngineTestBase):
marker=marker,
flag=flag)
expect = {'k3': {'$lt': 'v3'}, 'k2': {'eq': 'v2'}, 'k1': {'eq': 'v1'}}
self.assertEqual(ret, expect)
self.assertEqual(expect, ret)
class MongoDBTestMarkerBase(test_storage_scenarios.DBTestBase,
@ -66,14 +66,14 @@ class MongoDBTestMarkerBase(test_storage_scenarios.DBTestBase,
marker_pairs = {'user_id': 'user-id-4'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-4')
self.assertEqual('project-id-4', ret['project_id'])
def test_get_marker_None(self):
marker_pairs = {'user_id': 'user-id-foo'}
try:
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-foo')
self.assertEqual('project-id-foo', ret['project_id'])
except base.NoResultFound:
self.assertTrue(True)
@ -82,7 +82,7 @@ class MongoDBTestMarkerBase(test_storage_scenarios.DBTestBase,
marker_pairs = {'project_id': 'project-id'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-foo')
self.assertEqual('project-id-foo', ret['project_id'])
except base.MultipleResultsFound:
self.assertTrue(True)
@ -105,8 +105,9 @@ class IndexTest(MongoDBEngineTestBase):
self.conn.upgrade()
self.assertFalse(self.conn.db.meter.ensure_index('foo',
name='meter_ttl'))
self.assertEqual(self.conn.db.meter.index_information()[
'meter_ttl']['expireAfterSeconds'], 456789)
self.assertEqual(456789,
self.conn.db.meter.index_information()
['meter_ttl']['expireAfterSeconds'])
self.CONF.set_override('time_to_live', -1, group='database')
self.conn.upgrade()
@ -121,7 +122,7 @@ class AlarmTestPagination(test_storage_scenarios.AlarmTestBase,
marker_pairs = {'name': 'red-alert'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs=marker_pairs)
self.assertEqual(ret['rule']['meter_name'], 'test.one')
self.assertEqual('test.one', ret['rule']['meter_name'])
def test_alarm_get_marker_None(self):
self.add_some_alarms()
@ -129,8 +130,7 @@ class AlarmTestPagination(test_storage_scenarios.AlarmTestBase,
marker_pairs = {'name': 'user-id-foo'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs)
self.assertEqual(ret['rule']['meter_name'],
'meter_name-foo')
self.assertEqual('meter_name-foo', ret['rule']['meter_name'])
except base.NoResultFound:
self.assertTrue(True)
@ -140,8 +140,7 @@ class AlarmTestPagination(test_storage_scenarios.AlarmTestBase,
marker_pairs = {'user_id': 'me'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs)
self.assertEqual(ret['rule']['meter_name'],
'counter-name-foo')
self.assertEqual('counter-name-foo', ret['rule']['meter_name'])
except base.MultipleResultsFound:
self.assertTrue(True)

View File

@ -47,7 +47,7 @@ class CeilometerBaseTest(EventTestBase):
def test_ceilometer_base(self):
base = sql_models.CeilometerBase()
base['key'] = 'value'
self.assertEqual(base['key'], 'value')
self.assertEqual('value', base['key'])
class TraitTypeTest(EventTestBase):
@ -58,9 +58,9 @@ class TraitTypeTest(EventTestBase):
tt1 = self.conn._get_or_create_trait_type("foo", 0)
self.assertTrue(tt1.id >= 0)
tt2 = self.conn._get_or_create_trait_type("foo", 0)
self.assertEqual(tt1.id, tt2.id)
self.assertEqual(tt1.desc, tt2.desc)
self.assertEqual(tt1.data_type, tt2.data_type)
self.assertEqual(tt2.id, tt1.id)
self.assertEqual(tt2.desc, tt1.desc)
self.assertEqual(tt2.data_type, tt1.data_type)
def test_new_trait_type(self):
tt1 = self.conn._get_or_create_trait_type("foo", 0)
@ -76,7 +76,7 @@ class TraitTypeTest(EventTestBase):
self.assertTrue(tt1.id >= 0)
tt2 = self.conn._get_or_create_trait_type("foo", 1)
self.assertNotEqual(tt1.id, tt2.id)
self.assertEqual(tt1.desc, tt2.desc)
self.assertEqual(tt2.desc, tt1.desc)
self.assertNotEqual(tt1.data_type, tt2.data_type)
# Test the method __repr__ returns a string
self.assertTrue(repr.repr(tt2))
@ -90,8 +90,8 @@ class EventTypeTest(EventTestBase):
et1 = self.conn._get_or_create_event_type("foo")
self.assertTrue(et1.id >= 0)
et2 = self.conn._get_or_create_event_type("foo")
self.assertEqual(et1.id, et2.id)
self.assertEqual(et1.desc, et2.desc)
self.assertEqual(et2.id, et1.id)
self.assertEqual(et2.desc, et1.desc)
def test_event_type_unique(self):
et1 = self.conn._get_or_create_event_type("foo")
@ -111,43 +111,43 @@ class EventTest(EventTestBase):
def test_string_traits(self):
model = models.Trait("Foo", models.Trait.TEXT_TYPE, "my_text")
trait = self.conn._make_trait(model, None)
self.assertEqual(trait.trait_type.data_type, models.Trait.TEXT_TYPE)
self.assertEqual(models.Trait.TEXT_TYPE, trait.trait_type.data_type)
self.assertIsNone(trait.t_float)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_datetime)
self.assertEqual(trait.t_string, "my_text")
self.assertEqual("my_text", trait.t_string)
self.assertIsNotNone(trait.trait_type.desc)
def test_int_traits(self):
model = models.Trait("Foo", models.Trait.INT_TYPE, 100)
trait = self.conn._make_trait(model, None)
self.assertEqual(trait.trait_type.data_type, models.Trait.INT_TYPE)
self.assertEqual(models.Trait.INT_TYPE, trait.trait_type.data_type)
self.assertIsNone(trait.t_float)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_datetime)
self.assertEqual(trait.t_int, 100)
self.assertEqual(100, trait.t_int)
self.assertIsNotNone(trait.trait_type.desc)
def test_float_traits(self):
model = models.Trait("Foo", models.Trait.FLOAT_TYPE, 123.456)
trait = self.conn._make_trait(model, None)
self.assertEqual(trait.trait_type.data_type, models.Trait.FLOAT_TYPE)
self.assertEqual(models.Trait.FLOAT_TYPE, trait.trait_type.data_type)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_datetime)
self.assertEqual(trait.t_float, 123.456)
self.assertEqual(123.456, trait.t_float)
self.assertIsNotNone(trait.trait_type.desc)
def test_datetime_traits(self):
now = datetime.datetime.utcnow()
model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now)
trait = self.conn._make_trait(model, None)
self.assertEqual(trait.trait_type.data_type,
models.Trait.DATETIME_TYPE)
self.assertEqual(models.Trait.DATETIME_TYPE,
trait.trait_type.data_type)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_float)
self.assertEqual(trait.t_datetime, now)
self.assertEqual(now, trait.t_datetime)
self.assertIsNotNone(trait.trait_type.desc)
def test_bad_event(self):
@ -160,7 +160,7 @@ class EventTest(EventTestBase):
problem_events = self.conn.record_events(m)
self.assertEqual(2, len(problem_events))
for bad, event in problem_events:
self.assertEqual(models.Event.UNKNOWN_PROBLEM, bad)
self.assertEqual(bad, models.Event.UNKNOWN_PROBLEM)
def test_get_none_value_traits(self):
model = sql_models.Trait(None, None, 5)
@ -196,32 +196,32 @@ class RelationshipTest(scenarios.DBTestBase):
meta_tables = [sql_models.MetaText, sql_models.MetaFloat,
sql_models.MetaBigInt, sql_models.MetaBool]
for table in meta_tables:
self.assertEqual(session.query(table)
self.assertEqual(0, session.query(table)
.filter(~table.id.in_(
session.query(sql_models.Sample.id)
.group_by(sql_models.Sample.id)
)).count(), 0)
)).count())
def test_clear_metering_data_associations(self):
timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45)
self.conn.clear_expired_metering_data(3 * 60)
session = self.conn._get_db_session()
self.assertEqual(session.query(sql_models.sourceassoc)
self.assertEqual(0, session.query(sql_models.sourceassoc)
.filter(~sql_models.sourceassoc.c.sample_id.in_(
session.query(sql_models.Sample.id)
.group_by(sql_models.Sample.id)
)).count(), 0)
self.assertEqual(session.query(sql_models.sourceassoc)
)).count())
self.assertEqual(0, session.query(sql_models.sourceassoc)
.filter(~sql_models.sourceassoc.c.project_id.in_(
session.query(sql_models.Project.id)
.group_by(sql_models.Project.id)
)).count(), 0)
self.assertEqual(session.query(sql_models.sourceassoc)
)).count())
self.assertEqual(0, session.query(sql_models.sourceassoc)
.filter(~sql_models.sourceassoc.c.user_id.in_(
session.query(sql_models.User.id)
.group_by(sql_models.User.id)
)).count(), 0)
)).count())
class CapabilitiesTest(EventTestBase):

View File

@ -31,27 +31,29 @@ class ModelTest(test.BaseTestCase):
def test_create_attributes(self):
m = FakeModel(1, 2)
self.assertEqual(m.arg1, 1)
self.assertEqual(m.arg2, 2)
self.assertEqual(1, m.arg1)
self.assertEqual(2, m.arg2)
def test_as_dict(self):
m = FakeModel(1, 2)
d = m.as_dict()
self.assertEqual(d, {'arg1': 1, 'arg2': 2})
self.assertEqual({'arg1': 1, 'arg2': 2}, d)
def test_as_dict_recursive(self):
m = FakeModel(1, FakeModel('a', 'b'))
d = m.as_dict()
self.assertEqual(d, {'arg1': 1,
'arg2': {'arg1': 'a',
'arg2': 'b'}})
self.assertEqual({'arg1': 1,
'arg2': {'arg1': 'a',
'arg2': 'b'}},
d)
def test_as_dict_recursive_list(self):
m = FakeModel(1, [FakeModel('a', 'b')])
d = m.as_dict()
self.assertEqual(d, {'arg1': 1,
'arg2': [{'arg1': 'a',
'arg2': 'b'}]})
self.assertEqual({'arg1': 1,
'arg2': [{'arg1': 'a',
'arg2': 'b'}]},
d)
def test_event_repr_no_traits(self):
x = models.Event("1", "name", "now", None)
@ -91,19 +93,19 @@ class TestTraitModel(test.BaseTestCase):
def test_convert_value(self):
v = models.Trait.convert_value(
models.Trait.INT_TYPE, '10')
self.assertEqual(v, 10)
self.assertEqual(10, v)
self.assertIsInstance(v, int)
v = models.Trait.convert_value(
models.Trait.FLOAT_TYPE, '10')
self.assertEqual(v, 10.0)
self.assertEqual(10.0, v)
self.assertIsInstance(v, float)
v = models.Trait.convert_value(
models.Trait.DATETIME_TYPE, '2013-08-08 21:05:37.123456')
self.assertEqual(v, datetime.datetime(2013, 8, 8, 21, 5, 37, 123456))
self.assertEqual(datetime.datetime(2013, 8, 8, 21, 5, 37, 123456), v)
self.assertIsInstance(v, datetime.datetime)
v = models.Trait.convert_value(
models.Trait.TEXT_TYPE, 10)
self.assertEqual(v, "10")
self.assertEqual("10", v)
self.assertIsInstance(v, str)

View File

@ -46,7 +46,7 @@ class BinTestCase(base.BaseTestCase):
def test_dbsync_run(self):
subp = subprocess.Popen(['ceilometer-dbsync',
"--config-file=%s" % self.tempfile])
self.assertEqual(subp.wait(), 0)
self.assertEqual(0, subp.wait())
def test_run_expirer_ttl_disabled(self):
subp = subprocess.Popen(['ceilometer-expirer',
@ -54,7 +54,7 @@ class BinTestCase(base.BaseTestCase):
"--config-file=%s" % self.tempfile],
stderr=subprocess.PIPE)
__, err = subp.communicate()
self.assertEqual(subp.poll(), 0)
self.assertEqual(0, subp.poll())
self.assertIn("Nothing to clean", err)
def test_run_expirer_ttl_enabled(self):
@ -69,7 +69,7 @@ class BinTestCase(base.BaseTestCase):
"--config-file=%s" % self.tempfile],
stderr=subprocess.PIPE)
__, err = subp.communicate()
self.assertEqual(subp.poll(), 0)
self.assertEqual(0, subp.poll())
self.assertIn("Dropping data with TTL 1", err)
@ -94,7 +94,7 @@ class BinSendSampleTestCase(base.BaseTestCase):
"--config-file=%s" % self.tempfile,
"--sample-resource=someuuid",
"--sample-name=mycounter"])
self.assertEqual(subp.wait(), 0)
self.assertEqual(0, subp.wait())
class BinApiTestCase(base.BaseTestCase):
@ -145,10 +145,10 @@ class BinApiTestCase(base.BaseTestCase):
def test_v1(self):
response, content = self.get_response('v1/meters')
self.assertEqual(response.status, 200)
self.assertEqual(json.loads(content), {'meters': []})
self.assertEqual(200, response.status)
self.assertEqual({'meters': []}, json.loads(content))
def test_v2(self):
response, content = self.get_response('v2/meters')
self.assertEqual(response.status, 200)
self.assertEqual(json.loads(content), [])
self.assertEqual(200, response.status)
self.assertEqual([], json.loads(content))

View File

@ -77,30 +77,26 @@ class TestNotifications(test.BaseTestCase):
sample = list(middleware.HTTPRequest().process_notification(
HTTP_REQUEST
))[0]
self.assertEqual(sample.user_id,
HTTP_REQUEST['payload']['request']['HTTP_X_USER_ID'])
self.assertEqual(sample.project_id,
HTTP_REQUEST['payload']['request']
['HTTP_X_PROJECT_ID'])
self.assertEqual(sample.resource_id,
HTTP_REQUEST['payload']['request']
['HTTP_X_SERVICE_NAME'])
self.assertEqual(sample.volume, 1)
self.assertEqual(HTTP_REQUEST['payload']['request']['HTTP_X_USER_ID'],
sample.user_id)
self.assertEqual(HTTP_REQUEST['payload']['request']
['HTTP_X_PROJECT_ID'], sample.project_id)
self.assertEqual(HTTP_REQUEST['payload']['request']
['HTTP_X_SERVICE_NAME'], sample.resource_id)
self.assertEqual(1, sample.volume)
def test_process_response_notification(self):
sample = list(middleware.HTTPResponse().process_notification(
HTTP_RESPONSE
))[0]
self.assertEqual(sample.user_id,
HTTP_RESPONSE['payload']['request']['HTTP_X_USER_ID'])
self.assertEqual(sample.project_id,
HTTP_RESPONSE['payload']['request']
['HTTP_X_PROJECT_ID'])
self.assertEqual(sample.resource_id,
HTTP_RESPONSE['payload']['request']
['HTTP_X_SERVICE_NAME'])
self.assertEqual(sample.volume, 1)
self.assertEqual(HTTP_RESPONSE['payload']['request']['HTTP_X_USER_ID'],
sample.user_id)
self.assertEqual(HTTP_RESPONSE['payload']['request']
['HTTP_X_PROJECT_ID'], sample.project_id)
self.assertEqual(HTTP_RESPONSE['payload']['request']
['HTTP_X_SERVICE_NAME'], sample.resource_id)
self.assertEqual(1, sample.volume)
def test_exchanges(self):
topics = middleware.HTTPRequest().get_exchange_topics(self.CONF)
self.assertEqual(len(topics), 4)
self.assertEqual(4, len(topics))

View File

@ -81,7 +81,7 @@ class TestNotifier(test.BaseTestCase):
transformer_manager)
pub = notifier._pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(pub.samples), 0)
self.assertEqual(0, len(pub.samples))
notifier.notify(None, MESSAGE)
self.assertTrue(len(pub.samples) > 0)
self.assertIn('disk.ephemeral.size',

View File

@ -94,11 +94,11 @@ class TestNovaClient(test.BaseTestCase):
side_effect=self.fake_servers_list):
instances = self.nv.instance_get_all_by_host('foobar')
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].flavor['name'], 'm1.tiny')
self.assertEqual(instances[0].image['name'], 'ubuntu-12.04-x86')
self.assertEqual(instances[0].kernel_id, 11)
self.assertEqual(instances[0].ramdisk_id, 21)
self.assertEqual(1, len(instances))
self.assertEqual('m1.tiny', instances[0].flavor['name'])
self.assertEqual('ubuntu-12.04-x86', instances[0].image['name'])
self.assertEqual(11, instances[0].kernel_id)
self.assertEqual(21, instances[0].ramdisk_id)
@staticmethod
def fake_servers_list_unknown_flavor(*args, **kwargs):
@ -113,8 +113,8 @@ class TestNovaClient(test.BaseTestCase):
side_effect=self.fake_servers_list_unknown_flavor):
instances = self.nv.instance_get_all_by_host('foobar')
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].flavor['name'], 'unknown-id-666')
self.assertEqual(1, len(instances))
self.assertEqual('unknown-id-666', instances[0].flavor['name'])
@staticmethod
def fake_servers_list_unknown_image(*args, **kwargs):
@ -145,22 +145,22 @@ class TestNovaClient(test.BaseTestCase):
side_effect=self.fake_servers_list_unknown_image):
instances = self.nv.instance_get_all_by_host('foobar')
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].image['name'], 'unknown-id-666')
self.assertEqual(1, len(instances))
self.assertEqual('unknown-id-666', instances[0].image['name'])
def test_with_flavor_and_image(self):
results = self.nv._with_flavor_and_image(self.fake_servers_list())
instance = results[0]
self.assertEqual(instance.image['name'], 'ubuntu-12.04-x86')
self.assertEqual(instance.flavor['name'], 'm1.tiny')
self.assertEqual(instance.kernel_id, 11)
self.assertEqual(instance.ramdisk_id, 21)
self.assertEqual('ubuntu-12.04-x86', instance.image['name'])
self.assertEqual('m1.tiny', instance.flavor['name'])
self.assertEqual(11, instance.kernel_id)
self.assertEqual(21, instance.ramdisk_id)
def test_with_flavor_and_image_unknown_image(self):
instances = self.fake_servers_list_unknown_image()
results = self.nv._with_flavor_and_image(instances)
instance = results[0]
self.assertEqual(instance.image['name'], 'unknown-id-666')
self.assertEqual('unknown-id-666', instance.image['name'])
self.assertNotEqual(instance.flavor['name'], 'unknown-id-666')
self.assertIsNone(instance.kernel_id)
self.assertIsNone(instance.ramdisk_id)
@ -169,13 +169,13 @@ class TestNovaClient(test.BaseTestCase):
instances = self.fake_servers_list_unknown_flavor()
results = self.nv._with_flavor_and_image(instances)
instance = results[0]
self.assertEqual(instance.flavor['name'], 'unknown-id-666')
self.assertEqual(instance.flavor['vcpus'], 0)
self.assertEqual(instance.flavor['ram'], 0)
self.assertEqual(instance.flavor['disk'], 0)
self.assertEqual('unknown-id-666', instance.flavor['name'])
self.assertEqual(0, instance.flavor['vcpus'])
self.assertEqual(0, instance.flavor['ram'])
self.assertEqual(0, instance.flavor['disk'])
self.assertNotEqual(instance.image['name'], 'unknown-id-666')
self.assertEqual(instance.kernel_id, 11)
self.assertEqual(instance.ramdisk_id, 21)
self.assertEqual(11, instance.kernel_id)
self.assertEqual(21, instance.ramdisk_id)
def test_with_flavor_and_image_none_metadata(self):
instances = self.fake_servers_list_image_missing_metadata(3)
@ -195,7 +195,7 @@ class TestNovaClient(test.BaseTestCase):
instances = self.fake_servers_list_image_missing_metadata(5)
results = self.nv._with_flavor_and_image(instances)
instance = results[0]
self.assertEqual(instance.kernel_id, 11)
self.assertEqual(11, instance.kernel_id)
self.assertIsNone(instance.ramdisk_id)
def test_with_flavor_and_image_missing_kernel(self):
@ -203,7 +203,7 @@ class TestNovaClient(test.BaseTestCase):
results = self.nv._with_flavor_and_image(instances)
instance = results[0]
self.assertIsNone(instance.kernel_id)
self.assertEqual(instance.ramdisk_id, 21)
self.assertEqual(21, instance.ramdisk_id)
def test_with_missing_image_instance(self):
instances = self.fake_instance_image_missing()

View File

@ -106,4 +106,4 @@ class NotificationBaseTestCase(test.BaseTestCase):
c = self.FakeComputePlugin()
n = self.FakeNetworkPlugin()
self.assertTrue(len(list(c.to_samples(TEST_NOTIFICATION))) > 0)
self.assertEqual(len(list(n.to_samples(TEST_NOTIFICATION))), 0)
self.assertEqual(0, len(list(n.to_samples(TEST_NOTIFICATION))))

View File

@ -32,27 +32,22 @@ class TestUtils(test.BaseTestCase):
expected = 1356093296.12
utc_datetime = datetime.datetime.utcfromtimestamp(expected)
actual = utils.dt_to_decimal(utc_datetime)
self.assertEqual(float(actual), expected)
self.assertEqual(expected, float(actual))
def test_decimal_to_datetime(self):
expected = 1356093296.12
dexpected = decimal.Decimal(str(expected)) # Python 2.6 wants str()
expected_datetime = datetime.datetime.utcfromtimestamp(expected)
actual_datetime = utils.decimal_to_dt(dexpected)
self.assertEqual(actual_datetime, expected_datetime)
self.assertEqual(expected_datetime, actual_datetime)
def test_recursive_keypairs(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
}
data = {'a': 'A', 'b': 'B',
'nested': {'a': 'A', 'b': 'B'}}
pairs = list(utils.recursive_keypairs(data))
self.assertEqual(pairs, [('a', 'A'),
('b', 'B'),
('nested:a', 'A'),
('nested:b', 'B')])
self.assertEqual([('a', 'A'), ('b', 'B'),
('nested:a', 'A'), ('nested:b', 'B')],
pairs)
def test_recursive_keypairs_with_separator(self):
data = {'a': 'A',
@ -63,10 +58,11 @@ class TestUtils(test.BaseTestCase):
}
separator = '.'
pairs = list(utils.recursive_keypairs(data, separator))
self.assertEqual(pairs, [('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B')])
self.assertEqual([('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B')],
pairs)
def test_recursive_keypairs_with_list_of_dict(self):
small = 1
@ -80,7 +76,7 @@ class TestUtils(test.BaseTestCase):
'b': 'B',
'nested': {'list': [nested]}}
pairs = list(utils.recursive_keypairs(data))
self.assertEqual(pairs, expected)
self.assertEqual(expected, pairs)
def test_decimal_to_dt_with_none_parameter(self):
self.assertIsNone(utils.decimal_to_dt(None))
@ -94,9 +90,9 @@ class TestUtils(test.BaseTestCase):
'nested2': [{'c': 'A'}, {'c': 'B'}]
}
pairs = list(utils.dict_to_keyval(data))
self.assertEqual(pairs, [('a', 'A'),
('b', 'B'),
('nested2[0].c', 'A'),
('nested2[1].c', 'B'),
('nested.a', 'A'),
('nested.b', 'B')])
self.assertEqual([('a', 'A'), ('b', 'B'),
('nested2[0].c', 'A'),
('nested2[1].c', 'B'),
('nested.a', 'A'),
('nested.b', 'B')],
pairs)

View File

@ -105,61 +105,61 @@ class TestNotifications(test.BaseTestCase):
def _verify_common_sample(self, s, name, notification):
self.assertIsNotNone(s)
self.assertEqual(s.name, name)
self.assertEqual(s.resource_id, notification['payload']['volume_id'])
self.assertEqual(s.timestamp, notification['timestamp'])
self.assertEqual(notification['payload']['volume_id'], s.resource_id)
self.assertEqual(notification['timestamp'], s.timestamp)
metadata = s.resource_metadata
self.assertEqual(metadata.get('host'), notification['publisher_id'])
self.assertEqual(notification['publisher_id'], metadata.get('host'))
def test_volume_exists(self):
v = notifications.Volume()
samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume', NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(s.volume, 1)
self.assertEqual(1, s.volume)
def test_volume_size_exists(self):
v = notifications.VolumeSize()
samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume.size',
NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(s.volume,
NOTIFICATION_VOLUME_EXISTS['payload']['size'])
self.assertEqual(NOTIFICATION_VOLUME_EXISTS['payload']['size'],
s.volume)
def test_volume_delete(self):
v = notifications.Volume()
samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume', NOTIFICATION_VOLUME_DELETE)
self.assertEqual(s.volume, 1)
self.assertEqual(1, s.volume)
def test_volume_size_delete(self):
v = notifications.VolumeSize()
samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume.size',
NOTIFICATION_VOLUME_DELETE)
self.assertEqual(s.volume,
NOTIFICATION_VOLUME_DELETE['payload']['size'])
self.assertEqual(NOTIFICATION_VOLUME_DELETE['payload']['size'],
s.volume)
def test_volume_resize(self):
v = notifications.Volume()
samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume', NOTIFICATION_VOLUME_RESIZE)
self.assertEqual(s.volume, 1)
self.assertEqual(1, s.volume)
def test_volume_size_resize(self):
v = notifications.VolumeSize()
samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE))
self.assertEqual(len(samples), 1)
self.assertEqual(1, len(samples))
s = samples[0]
self._verify_common_sample(s, 'volume.size',
NOTIFICATION_VOLUME_RESIZE)
self.assertEqual(s.volume,
NOTIFICATION_VOLUME_RESIZE['payload']['size'])
self.assertEqual(NOTIFICATION_VOLUME_RESIZE['payload']['size'],
s.volume)