diff --git a/neutron/db/firewall/firewall_db.py b/neutron/db/firewall/firewall_db.py index 1e1f1ac8e3..0aa4192047 100644 --- a/neutron/db/firewall/firewall_db.py +++ b/neutron/db/firewall/firewall_db.py @@ -183,6 +183,9 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): # the integrity of this list. raise firewall.FirewallRuleNotFound(firewall_rule_id= fwrule_id) + elif rules_dict[fwrule_id]['firewall_policy_id']: + raise firewall.FirewallRuleInUse( + firewall_rule_id=fwrule_id) # New list of rules is valid so we will first reset the existing # list and then add each rule in order. # Note that the list could be empty in which case we interpret diff --git a/neutron/tests/unit/db/firewall/test_db_firewall.py b/neutron/tests/unit/db/firewall/test_db_firewall.py index 052f95a7ee..4c204e3719 100644 --- a/neutron/tests/unit/db/firewall/test_db_firewall.py +++ b/neutron/tests/unit/db/firewall/test_db_firewall.py @@ -316,6 +316,16 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): for k, v in attrs.iteritems(): self.assertEqual(fwp['firewall_policy'][k], v) + def test_create_firewall_policy_with_previously_associated_rule(self): + with self.firewall_rule() as fwr: + fw_rule_ids = [fwr['firewall_rule']['id']] + with self.firewall_policy(firewall_rules=fw_rule_ids): + res = self._create_firewall_policy( + None, 'firewall_policy2', description=DESCRIPTION, + shared=SHARED, firewall_rules=fw_rule_ids, + audited=AUDITED) + self.assertEqual(res.status_int, 409) + def test_show_firewall_policy(self): name = "firewall_policy1" attrs = self._get_test_firewall_policy_attrs(name) @@ -815,6 +825,20 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): expected_code=webob.exc.HTTPBadRequest.code, expected_body=None) + def test_insert_rule_for_previously_associated_rule(self): + with self.firewall_rule() as fwr: + fwr_id = fwr['firewall_rule']['id'] + fw_rule_ids = [fwr_id] + with self.firewall_policy(firewall_rules=fw_rule_ids): + with self.firewall_policy(name='firewall_policy2') as fwp: + fwp_id = fwp['firewall_policy']['id'] + insert_data = {'firewall_rule_id': fwr_id} + self._rule_action( + 'insert', fwp_id, fwr_id, insert_before=None, + insert_after=None, + expected_code=webob.exc.HTTPConflict.code, + expected_body=None, body_data=insert_data) + def test_insert_rule_in_policy(self): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False