From 9335ffd7c3eaad0d66d7d19e7760ae12476a5ea2 Mon Sep 17 00:00:00 2001 From: Aaron Rosen Date: Thu, 21 Nov 2013 05:28:28 -0800 Subject: [PATCH] Fix unable to add allow all IPv4/6 security group rule Previously, if one tried to add a rule to allow all ingress ipv4 neutron would respond that the rule was already part of the security group. This happened as the filter for querying existing rules uses a wildcard for remote_group_id thus returning a false match. This patch addresses this issue. Change-Id: I0320013a3869d25fb424995354721929465d2848 Closes-bug: #1252806 --- neutron/db/securitygroups_db.py | 18 +++++++++-- .../unit/test_extension_security_group.py | 31 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 5986190c5d..2a7d2ef035 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -395,9 +395,21 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): # Check in database if rule exists filters = self._make_security_group_rule_filter_dict(i) - rules = self.get_security_group_rules(context, filters) - if rules: - raise ext_sg.SecurityGroupRuleExists(id=str(rules[0]['id'])) + db_rules = self.get_security_group_rules(context, filters) + # Note(arosen): the call to get_security_group_rules wildcards + # values in the filter that have a value of [None]. For + # example, filters = {'remote_group_id': [None]} will return + # all security group rules regardless of their value of + # remote_group_id. Therefore it is not possible to do this + # query unless the behavior of _get_collection() + # is changed which cannot be because other methods are already + # relying on this behavor. Therefore, we do the filtering + # below to check for these corner cases. + for db_rule in db_rules: + # need to remove id from db_rule for matching + id = db_rule.pop('id') + if (i['security_group_rule'] == db_rule): + raise ext_sg.SecurityGroupRuleExists(id=id) def get_security_group_rules(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, diff --git a/neutron/tests/unit/test_extension_security_group.py b/neutron/tests/unit/test_extension_security_group.py index d77f200a8e..8028ac4073 100644 --- a/neutron/tests/unit/test_extension_security_group.py +++ b/neutron/tests/unit/test_extension_security_group.py @@ -1175,6 +1175,37 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + def test_create_security_group_rule_allow_all_ipv4(self): + with self.security_group() as sg: + rule = {'security_group_id': sg['security_group']['id'], + 'direction': 'ingress', + 'ethertype': 'IPv4', + 'tenant_id': 'test_tenant'} + + res = self._create_security_group_rule( + self.fmt, {'security_group_rule': rule}) + rule = self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + + def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk " + "security_group_rule create") + with self.security_group() as sg: + rule_v4 = {'security_group_id': sg['security_group']['id'], + 'direction': 'ingress', + 'ethertype': 'IPv4', + 'tenant_id': 'test_tenant'} + rule_v6 = {'security_group_id': sg['security_group']['id'], + 'direction': 'ingress', + 'ethertype': 'IPv6', + 'tenant_id': 'test_tenant'} + + rules = {'security_group_rules': [rule_v4, rule_v6]} + res = self._create_security_group_rule(self.fmt, rules) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) + def test_create_security_group_rule_duplicate_rule_in_post(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk "