diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py index c841bf322e..f5d4be886d 100644 --- a/neutron/tests/unit/test_security_groups_rpc.py +++ b/neutron/tests/unit/test_security_groups_rpc.py @@ -943,21 +943,15 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase): 'NoopFirewallDriver') -class SecurityGroupAgentRpcTestCase(base.BaseTestCase): +class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase): def setUp(self, defer_refresh_firewall=False): - super(SecurityGroupAgentRpcTestCase, self).setUp() - cfg.CONF.set_default('firewall_driver', - 'neutron.agent.firewall.NoopFirewallDriver', - group='SECURITYGROUP') + super(BaseSecurityGroupAgentRpcTestCase, self).setUp() + set_firewall_driver(FIREWALL_NOOP_DRIVER) self.agent = sg_rpc.SecurityGroupAgentRpcMixin() self.agent.context = None mock.patch('neutron.agent.linux.iptables_manager').start() self.agent.root_helper = 'sudo' - rpc = mock.Mock() - rpc.security_group_info_for_devices.side_effect = ( - messaging.UnsupportedVersion('1.2')) - self.agent.plugin_rpc = rpc - + self.agent.plugin_rpc = mock.Mock() self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) self.firewall = mock.Mock() firewall_object = firewall_base.FirewallDriver() @@ -970,9 +964,18 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): 'fake_sgid1', 'remote_group_id': 'fake_sgid2'}]} - fake_devices = {'fake_device': self.fake_device} - self.firewall.ports = fake_devices - rpc.security_group_rules_for_devices.return_value = fake_devices + self.firewall.ports = {'fake_device': self.fake_device} + + +class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase): + def setUp(self, defer_refresh_firewall=False): + super(SecurityGroupAgentRpcTestCase, self).setUp( + defer_refresh_firewall) + rpc = self.agent.plugin_rpc + rpc.security_group_info_for_devices.side_effect = ( + messaging.UnsupportedVersion('1.2')) + rpc.security_group_rules_for_devices.return_value = ( + self.firewall.ports) def test_prepare_and_remove_devices_filter(self): self.agent.prepare_devices_filter(['fake_device']) @@ -1041,39 +1044,20 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.firewall.assert_has_calls([]) -class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase): +class SecurityGroupAgentEnhancedRpcTestCase( + BaseSecurityGroupAgentRpcTestCase): + def setUp(self, defer_refresh_firewall=False): - super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp() - cfg.CONF.set_default('firewall_driver', - 'neutron.agent.firewall.NoopFirewallDriver', - group='SECURITYGROUP') - self.agent = sg_rpc.SecurityGroupAgentRpcMixin() - self.agent.context = None - mock.patch('neutron.agent.linux.iptables_manager').start() - self.agent.root_helper = 'sudo' - rpc = mock.Mock() - self.agent.plugin_rpc = rpc - self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) - self.firewall = mock.Mock() - firewall_object = firewall_base.FirewallDriver() - self.firewall.defer_apply.side_effect = firewall_object.defer_apply - self.agent.firewall = self.firewall - self.fake_device = {'device': 'fake_device', - 'security_groups': ['fake_sgid1', 'fake_sgid2'], - 'security_group_source_groups': ['fake_sgid2'], - 'security_group_rules': [{'security_group_id': - 'fake_sgid1', - 'remote_group_id': - 'fake_sgid2'}]} - fake_devices = {'fake_device': self.fake_device} + super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp( + defer_refresh_firewall=defer_refresh_firewall) fake_sg_info = { 'security_groups': { 'fake_sgid1': [ {'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []}, 'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}}, - 'devices': fake_devices} - self.firewall.ports = fake_devices - rpc.security_group_info_for_devices.return_value = fake_sg_info + 'devices': self.firewall.ports} + self.agent.plugin_rpc.security_group_info_for_devices.return_value = ( + fake_sg_info) def test_prepare_and_remove_devices_filter_enhanced_rpc(self): self.agent.prepare_devices_filter(['fake_device']) @@ -2015,16 +1999,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): PHYSDEV_INGRESS = 'physdev-out' PHYSDEV_EGRESS = 'physdev-in' - def setUp(self, defer_refresh_firewall=False): + def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True): super(TestSecurityGroupAgentWithIptables, self).setUp() config.register_root_helper(cfg.CONF) cfg.CONF.set_override( 'lock_path', '$state_path/lock') - cfg.CONF.set_override( - 'firewall_driver', - self.FIREWALL_DRIVER, - group='SECURITYGROUP') + set_firewall_driver(self.FIREWALL_DRIVER) self.agent = sg_rpc.SecurityGroupAgentRpcMixin() self.agent.context = None @@ -2033,8 +2014,10 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): self.agent.root_helper = 'sudo' self.rpc = mock.Mock() self.agent.plugin_rpc = self.rpc - self.rpc.security_group_info_for_devices.side_effect = ( - messaging.UnsupportedVersion('1.2')) + + if test_rpc_v1_1: + self.rpc.security_group_info_for_devices.side_effect = ( + messaging.UnsupportedVersion('1.2')) self.agent.init_firewall( defer_refresh_firewall=defer_refresh_firewall) @@ -2216,21 +2199,8 @@ class TestSecurityGroupAgentEnhancedRpcWithIptables( TestSecurityGroupAgentWithIptables): def setUp(self, defer_refresh_firewall=False): super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp( - defer_refresh_firewall) - self.rpc = mock.Mock() - self.agent.plugin_rpc = self.rpc + defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False) self.sg_info = self.rpc.security_group_info_for_devices - self.agent.init_firewall( - defer_refresh_firewall=defer_refresh_firewall) - self.iptables = self.agent.firewall.iptables - self.iptables_execute = mock.patch.object(self.iptables, - "execute").start() - self.iptables_execute_return_values = [] - self.expected_call_count = 0 - self.expected_calls = [] - self.expected_process_inputs = [] - self.iptables_execute.side_effect = ( - self.iptables_execute_return_values) rule1 = [{'direction': 'ingress', 'protocol': const.PROTO_NAME_UDP,