FWaaS - fix policy association of firewall rule

If an existing firewall rule already associated with a
firewall policy is associated with a different firewall
policy, the new association should fail. The check for
the existing association was not being made, hence the
firewall rule was being removed from the older policy
and being associated with the newer policy (incorrect
behavior). This is being fixed here.

If the association with the newer policy has to be made
the rule should first be removed from the existing policy
association.

Change-Id: I30c41d77e7fde673f0dccbc98e1cd7bd0d7b384f
Closes-Bug: #1223465
This commit is contained in:
Sumit Naiksatam 2013-09-14 13:38:08 -07:00
parent 4c8afcced7
commit dded354a5f
2 changed files with 27 additions and 0 deletions

View File

@ -183,6 +183,9 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
# the integrity of this list.
raise firewall.FirewallRuleNotFound(firewall_rule_id=
fwrule_id)
elif rules_dict[fwrule_id]['firewall_policy_id']:
raise firewall.FirewallRuleInUse(
firewall_rule_id=fwrule_id)
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret

View File

@ -316,6 +316,16 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
for k, v in attrs.iteritems():
self.assertEqual(fwp['firewall_policy'][k], v)
def test_create_firewall_policy_with_previously_associated_rule(self):
with self.firewall_rule() as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
with self.firewall_policy(firewall_rules=fw_rule_ids):
res = self._create_firewall_policy(
None, 'firewall_policy2', description=DESCRIPTION,
shared=SHARED, firewall_rules=fw_rule_ids,
audited=AUDITED)
self.assertEqual(res.status_int, 409)
def test_show_firewall_policy(self):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
@ -815,6 +825,20 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)
def test_insert_rule_for_previously_associated_rule(self):
with self.firewall_rule() as fwr:
fwr_id = fwr['firewall_rule']['id']
fw_rule_ids = [fwr_id]
with self.firewall_policy(firewall_rules=fw_rule_ids):
with self.firewall_policy(name='firewall_policy2') as fwp:
fwp_id = fwp['firewall_policy']['id']
insert_data = {'firewall_rule_id': fwr_id}
self._rule_action(
'insert', fwp_id, fwr_id, insert_before=None,
insert_after=None,
expected_code=webob.exc.HTTPConflict.code,
expected_body=None, body_data=insert_data)
def test_insert_rule_in_policy(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False