vmware-nsx/neutron/tests/unit/test_common_utils.py
Jakub Libosvar 24078a6edb Common decorator for caching methods
Decorator caches methods' results, the key is composed from function
name and its parameters. Implementation of cache itself should be in
object implementing decorated method. Cache backend should be taken
from oslo-incubator.

Related-Bug: #1276440
Change-Id: Ibe3f1b2eb10b9601ad76ec24b7f0e3551b4013e2
2014-05-22 13:17:51 +02:00

384 lines
15 KiB
Python

# Copyright (c) 2012 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.plugins.common import utils as plugin_utils
from neutron.tests import base
class TestParseMappings(base.BaseTestCase):
def parse(self, mapping_list, unique_values=True):
return utils.parse_mappings(mapping_list, unique_values)
def test_parse_mappings_fails_for_missing_separator(self):
with testtools.ExpectedException(ValueError):
self.parse(['key'])
def test_parse_mappings_fails_for_missing_key(self):
with testtools.ExpectedException(ValueError):
self.parse([':val'])
def test_parse_mappings_fails_for_missing_value(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:'])
def test_parse_mappings_fails_for_extra_separator(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:val:junk'])
def test_parse_mappings_fails_for_duplicate_key(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:val1', 'key:val2'])
def test_parse_mappings_fails_for_duplicate_value(self):
with testtools.ExpectedException(ValueError):
self.parse(['key1:val', 'key2:val'])
def test_parse_mappings_succeeds_for_one_mapping(self):
self.assertEqual(self.parse(['key:val']), {'key': 'val'})
def test_parse_mappings_succeeds_for_n_mappings(self):
self.assertEqual(self.parse(['key1:val1', 'key2:val2']),
{'key1': 'val1', 'key2': 'val2'})
def test_parse_mappings_succeeds_for_duplicate_value(self):
self.assertEqual(self.parse(['key1:val', 'key2:val'], False),
{'key1': 'val', 'key2': 'val'})
def test_parse_mappings_succeeds_for_no_mappings(self):
self.assertEqual(self.parse(['']), {})
class UtilTestParseVlanRanges(base.BaseTestCase):
_err_prefix = "Invalid network VLAN range: '"
_err_too_few = "' - 'need more than 2 values to unpack'"
_err_too_many = "' - 'too many values to unpack'"
_err_not_int = "' - 'invalid literal for int() with base 10: '%s''"
_err_bad_vlan = "' - '%s is not a valid VLAN tag'"
_err_range = "' - 'End of VLAN range is less than start of VLAN range'"
def _range_too_few_err(self, nv_range):
return self._err_prefix + nv_range + self._err_too_few
def _range_too_many_err(self, nv_range):
return self._err_prefix + nv_range + self._err_too_many
def _vlan_not_int_err(self, nv_range, vlan):
return self._err_prefix + nv_range + (self._err_not_int % vlan)
def _nrange_invalid_vlan(self, nv_range, n):
vlan = nv_range.split(':')[n]
v_range = ':'.join(nv_range.split(':')[1:])
return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
def _vrange_invalid_vlan(self, v_range_tuple, n):
vlan = v_range_tuple[n - 1]
v_range_str = '%d:%d' % v_range_tuple
return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
def _vrange_invalid(self, v_range_tuple):
v_range_str = '%d:%d' % v_range_tuple
return self._err_prefix + v_range_str + self._err_range
class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
def verify_range(self, vlan_range):
return plugin_utils.verify_vlan_range(vlan_range)
def test_range_valid_ranges(self):
self.assertIsNone(self.verify_range((1, 2)))
self.assertIsNone(self.verify_range((1, 1999)))
self.assertIsNone(self.verify_range((100, 100)))
self.assertIsNone(self.verify_range((100, 200)))
self.assertIsNone(self.verify_range((4001, 4094)))
self.assertIsNone(self.verify_range((1, 4094)))
def check_one_vlan_invalid(self, bad_range, which):
expected_msg = self._vrange_invalid_vlan(bad_range, which)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.verify_range, bad_range)
self.assertEqual(str(err), expected_msg)
def test_range_first_vlan_invalid_negative(self):
self.check_one_vlan_invalid((-1, 199), 1)
def test_range_first_vlan_invalid_zero(self):
self.check_one_vlan_invalid((0, 199), 1)
def test_range_first_vlan_invalid_limit_plus_one(self):
self.check_one_vlan_invalid((4095, 199), 1)
def test_range_first_vlan_invalid_too_big(self):
self.check_one_vlan_invalid((9999, 199), 1)
def test_range_second_vlan_invalid_negative(self):
self.check_one_vlan_invalid((299, -1), 2)
def test_range_second_vlan_invalid_zero(self):
self.check_one_vlan_invalid((299, 0), 2)
def test_range_second_vlan_invalid_limit_plus_one(self):
self.check_one_vlan_invalid((299, 4095), 2)
def test_range_second_vlan_invalid_too_big(self):
self.check_one_vlan_invalid((299, 9999), 2)
def test_range_both_vlans_invalid_01(self):
self.check_one_vlan_invalid((-1, 0), 1)
def test_range_both_vlans_invalid_02(self):
self.check_one_vlan_invalid((0, 4095), 1)
def test_range_both_vlans_invalid_03(self):
self.check_one_vlan_invalid((4095, 9999), 1)
def test_range_both_vlans_invalid_04(self):
self.check_one_vlan_invalid((9999, -1), 1)
def test_range_reversed(self):
bad_range = (95, 10)
expected_msg = self._vrange_invalid(bad_range)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.verify_range, bad_range)
self.assertEqual(str(err), expected_msg)
class TestParseOneVlanRange(UtilTestParseVlanRanges):
def parse_one(self, cfg_entry):
return plugin_utils.parse_network_vlan_range(cfg_entry)
def test_parse_one_net_no_vlan_range(self):
config_str = "net1"
expected_networks = ("net1", None)
self.assertEqual(self.parse_one(config_str), expected_networks)
def test_parse_one_net_and_vlan_range(self):
config_str = "net1:100:199"
expected_networks = ("net1", (100, 199))
self.assertEqual(self.parse_one(config_str), expected_networks)
def test_parse_one_net_incomplete_range(self):
config_str = "net1:100"
expected_msg = self._range_too_few_err(config_str)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
def test_parse_one_net_range_too_many(self):
config_str = "net1:100:150:200"
expected_msg = self._range_too_many_err(config_str)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
def test_parse_one_net_vlan1_not_int(self):
config_str = "net1:foo:199"
expected_msg = self._vlan_not_int_err(config_str, 'foo')
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
def test_parse_one_net_vlan2_not_int(self):
config_str = "net1:100:bar"
expected_msg = self._vlan_not_int_err(config_str, 'bar')
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
def test_parse_one_net_and_max_range(self):
config_str = "net1:1:4094"
expected_networks = ("net1", (1, 4094))
self.assertEqual(self.parse_one(config_str), expected_networks)
def test_parse_one_net_range_bad_vlan1(self):
config_str = "net1:9000:150"
expected_msg = self._nrange_invalid_vlan(config_str, 1)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
def test_parse_one_net_range_bad_vlan2(self):
config_str = "net1:4000:4999"
expected_msg = self._nrange_invalid_vlan(config_str, 2)
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
class TestParseVlanRangeList(UtilTestParseVlanRanges):
def parse_list(self, cfg_entries):
return plugin_utils.parse_network_vlan_ranges(cfg_entries)
def test_parse_list_one_net_no_vlan_range(self):
config_list = ["net1"]
expected_networks = {"net1": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_list_one_net_vlan_range(self):
config_list = ["net1:100:199"]
expected_networks = {"net1": [(100, 199)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_two_nets_no_vlan_range(self):
config_list = ["net1",
"net2"]
expected_networks = {"net1": [],
"net2": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_two_nets_range_and_no_range(self):
config_list = ["net1:100:199",
"net2"]
expected_networks = {"net1": [(100, 199)],
"net2": []}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_two_nets_no_range_and_range(self):
config_list = ["net1",
"net2:200:299"]
expected_networks = {"net1": [],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_two_nets_bad_vlan_range1(self):
config_list = ["net1:100",
"net2:200:299"]
expected_msg = self._range_too_few_err(config_list[0])
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_list, config_list)
self.assertEqual(str(err), expected_msg)
def test_parse_two_nets_vlan_not_int2(self):
config_list = ["net1:100:199",
"net2:200:0x200"]
expected_msg = self._vlan_not_int_err(config_list[1], '0x200')
err = self.assertRaises(n_exc.NetworkVlanRangeError,
self.parse_list, config_list)
self.assertEqual(str(err), expected_msg)
def test_parse_two_nets_and_append_1_2(self):
config_list = ["net1:100:199",
"net1:1000:1099",
"net2:200:299"]
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
def test_parse_two_nets_and_append_1_3(self):
config_list = ["net1:100:199",
"net2:200:299",
"net1:1000:1099"]
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(self.parse_list(config_list), expected_networks)
class TestDictUtils(base.BaseTestCase):
def test_dict2str(self):
dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
expected = "key1=value1,key2=value2,key3=value3"
self.assertEqual(utils.dict2str(dic), expected)
def test_str2dict(self):
string = "key1=value1,key2=value2,key3=value3"
expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
self.assertEqual(utils.str2dict(string), expected)
def test_dict_str_conversion(self):
dic = {"key1": "value1", "key2": "value2"}
self.assertEqual(utils.str2dict(utils.dict2str(dic)), dic)
def test_diff_list_of_dict(self):
old_list = [{"key1": "value1"},
{"key2": "value2"},
{"key3": "value3"}]
new_list = [{"key1": "value1"},
{"key2": "value2"},
{"key4": "value4"}]
added, removed = utils.diff_list_of_dict(old_list, new_list)
self.assertEqual(added, [dict(key4="value4")])
self.assertEqual(removed, [dict(key3="value3")])
class _CachingDecorator(object):
def __init__(self):
self.func_retval = 'bar'
self._cache = mock.Mock()
@utils.cache_method_results
def func(self, *args, **kwargs):
return self.func_retval
class TestCachingDecorator(base.BaseTestCase):
def setUp(self):
super(TestCachingDecorator, self).setUp()
self.decor = _CachingDecorator()
self.func_name = '%(module)s._CachingDecorator.func' % {
'module': self.__module__
}
self.not_cached = self.decor.func.func.im_self._not_cached
def test_cache_miss(self):
expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
args = (1, 2)
kwargs = {'foo': 'bar'}
self.decor._cache.get.return_value = self.not_cached
retval = self.decor.func(*args, **kwargs)
self.decor._cache.set.assert_called_once_with(
expected_key, self.decor.func_retval, None)
self.assertEqual(self.decor.func_retval, retval)
def test_cache_hit(self):
expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
args = (1, 2)
kwargs = {'foo': 'bar'}
retval = self.decor.func(*args, **kwargs)
self.assertFalse(self.decor._cache.set.called)
self.assertEqual(self.decor._cache.get.return_value, retval)
self.decor._cache.get.assert_called_once_with(expected_key,
self.not_cached)
def test_get_unhashable(self):
expected_key = (self.func_name, [1], 2)
self.decor._cache.get.side_effect = TypeError
retval = self.decor.func([1], 2)
self.assertFalse(self.decor._cache.set.called)
self.assertEqual(self.decor.func_retval, retval)
self.decor._cache.get.assert_called_once_with(expected_key,
self.not_cached)
def test_missing_cache(self):
delattr(self.decor, '_cache')
self.assertRaises(NotImplementedError, self.decor.func, (1, 2))
def test_no_cache(self):
self.decor._cache = False
retval = self.decor.func((1, 2))
self.assertEqual(self.decor.func_retval, retval)
class TestDict2Tuples(base.BaseTestCase):
def test_dict(self):
input_dict = {'foo': 'bar', 42: 'baz', 'aaa': 'zzz'}
expected = ((42, 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
output_tuple = utils.dict2tuple(input_dict)
self.assertEqual(expected, output_tuple)