Merge "Avoid testing code duplication which introduced testing bugs"
This commit is contained in:
commit
e322e6f6b4
@ -943,21 +943,15 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
|
||||
'NoopFirewallDriver')
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
|
||||
class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase):
|
||||
def setUp(self, defer_refresh_firewall=False):
|
||||
super(SecurityGroupAgentRpcTestCase, self).setUp()
|
||||
cfg.CONF.set_default('firewall_driver',
|
||||
'neutron.agent.firewall.NoopFirewallDriver',
|
||||
group='SECURITYGROUP')
|
||||
super(BaseSecurityGroupAgentRpcTestCase, self).setUp()
|
||||
set_firewall_driver(FIREWALL_NOOP_DRIVER)
|
||||
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
|
||||
self.agent.context = None
|
||||
mock.patch('neutron.agent.linux.iptables_manager').start()
|
||||
self.agent.root_helper = 'sudo'
|
||||
rpc = mock.Mock()
|
||||
rpc.security_group_info_for_devices.side_effect = (
|
||||
messaging.UnsupportedVersion('1.2'))
|
||||
self.agent.plugin_rpc = rpc
|
||||
|
||||
self.agent.plugin_rpc = mock.Mock()
|
||||
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
|
||||
self.firewall = mock.Mock()
|
||||
firewall_object = firewall_base.FirewallDriver()
|
||||
@ -970,9 +964,18 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
|
||||
'fake_sgid1',
|
||||
'remote_group_id':
|
||||
'fake_sgid2'}]}
|
||||
fake_devices = {'fake_device': self.fake_device}
|
||||
self.firewall.ports = fake_devices
|
||||
rpc.security_group_rules_for_devices.return_value = fake_devices
|
||||
self.firewall.ports = {'fake_device': self.fake_device}
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
|
||||
def setUp(self, defer_refresh_firewall=False):
|
||||
super(SecurityGroupAgentRpcTestCase, self).setUp(
|
||||
defer_refresh_firewall)
|
||||
rpc = self.agent.plugin_rpc
|
||||
rpc.security_group_info_for_devices.side_effect = (
|
||||
messaging.UnsupportedVersion('1.2'))
|
||||
rpc.security_group_rules_for_devices.return_value = (
|
||||
self.firewall.ports)
|
||||
|
||||
def test_prepare_and_remove_devices_filter(self):
|
||||
self.agent.prepare_devices_filter(['fake_device'])
|
||||
@ -1041,39 +1044,20 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
|
||||
self.firewall.assert_has_calls([])
|
||||
|
||||
|
||||
class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
|
||||
class SecurityGroupAgentEnhancedRpcTestCase(
|
||||
BaseSecurityGroupAgentRpcTestCase):
|
||||
|
||||
def setUp(self, defer_refresh_firewall=False):
|
||||
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
|
||||
cfg.CONF.set_default('firewall_driver',
|
||||
'neutron.agent.firewall.NoopFirewallDriver',
|
||||
group='SECURITYGROUP')
|
||||
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
|
||||
self.agent.context = None
|
||||
mock.patch('neutron.agent.linux.iptables_manager').start()
|
||||
self.agent.root_helper = 'sudo'
|
||||
rpc = mock.Mock()
|
||||
self.agent.plugin_rpc = rpc
|
||||
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
|
||||
self.firewall = mock.Mock()
|
||||
firewall_object = firewall_base.FirewallDriver()
|
||||
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
|
||||
self.agent.firewall = self.firewall
|
||||
self.fake_device = {'device': 'fake_device',
|
||||
'security_groups': ['fake_sgid1', 'fake_sgid2'],
|
||||
'security_group_source_groups': ['fake_sgid2'],
|
||||
'security_group_rules': [{'security_group_id':
|
||||
'fake_sgid1',
|
||||
'remote_group_id':
|
||||
'fake_sgid2'}]}
|
||||
fake_devices = {'fake_device': self.fake_device}
|
||||
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp(
|
||||
defer_refresh_firewall=defer_refresh_firewall)
|
||||
fake_sg_info = {
|
||||
'security_groups': {
|
||||
'fake_sgid1': [
|
||||
{'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
|
||||
'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
|
||||
'devices': fake_devices}
|
||||
self.firewall.ports = fake_devices
|
||||
rpc.security_group_info_for_devices.return_value = fake_sg_info
|
||||
'devices': self.firewall.ports}
|
||||
self.agent.plugin_rpc.security_group_info_for_devices.return_value = (
|
||||
fake_sg_info)
|
||||
|
||||
def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
|
||||
self.agent.prepare_devices_filter(['fake_device'])
|
||||
@ -2015,16 +1999,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
|
||||
PHYSDEV_INGRESS = 'physdev-out'
|
||||
PHYSDEV_EGRESS = 'physdev-in'
|
||||
|
||||
def setUp(self, defer_refresh_firewall=False):
|
||||
def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
|
||||
super(TestSecurityGroupAgentWithIptables, self).setUp()
|
||||
config.register_root_helper(cfg.CONF)
|
||||
cfg.CONF.set_override(
|
||||
'lock_path',
|
||||
'$state_path/lock')
|
||||
cfg.CONF.set_override(
|
||||
'firewall_driver',
|
||||
self.FIREWALL_DRIVER,
|
||||
group='SECURITYGROUP')
|
||||
set_firewall_driver(self.FIREWALL_DRIVER)
|
||||
|
||||
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
|
||||
self.agent.context = None
|
||||
@ -2033,6 +2014,8 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
|
||||
self.agent.root_helper = 'sudo'
|
||||
self.rpc = mock.Mock()
|
||||
self.agent.plugin_rpc = self.rpc
|
||||
|
||||
if test_rpc_v1_1:
|
||||
self.rpc.security_group_info_for_devices.side_effect = (
|
||||
messaging.UnsupportedVersion('1.2'))
|
||||
|
||||
@ -2216,21 +2199,8 @@ class TestSecurityGroupAgentEnhancedRpcWithIptables(
|
||||
TestSecurityGroupAgentWithIptables):
|
||||
def setUp(self, defer_refresh_firewall=False):
|
||||
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
|
||||
defer_refresh_firewall)
|
||||
self.rpc = mock.Mock()
|
||||
self.agent.plugin_rpc = self.rpc
|
||||
defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)
|
||||
self.sg_info = self.rpc.security_group_info_for_devices
|
||||
self.agent.init_firewall(
|
||||
defer_refresh_firewall=defer_refresh_firewall)
|
||||
self.iptables = self.agent.firewall.iptables
|
||||
self.iptables_execute = mock.patch.object(self.iptables,
|
||||
"execute").start()
|
||||
self.iptables_execute_return_values = []
|
||||
self.expected_call_count = 0
|
||||
self.expected_calls = []
|
||||
self.expected_process_inputs = []
|
||||
self.iptables_execute.side_effect = (
|
||||
self.iptables_execute_return_values)
|
||||
|
||||
rule1 = [{'direction': 'ingress',
|
||||
'protocol': const.PROTO_NAME_UDP,
|
||||
|
Loading…
x
Reference in New Issue
Block a user