Stop admin using other tenants unshared rules

If the firewall rules are not shared and if they belong to different
tenants, then admin should not be able to create a policy using
these rules and he should not be able to insert such rules into
policies. An exception should be raised in such case. Added new
exception “FirewallRuleConflict” to handle such conditions.

Co-Authored-By: Koteswara Rao Kelam<koteswara.kelam@hp.com>
Change-Id: I984eb76069bd1493a77bf523bec2bd81abb14abb
Closes-bug: 1327057
This commit is contained in:
Preeti Mirji 2014-07-23 03:22:30 -07:00 committed by Koteswara Rao Kelam
parent 94a7c1ee5c
commit 9c41d7c392
3 changed files with 65 additions and 3 deletions

View File

@ -162,6 +162,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
'enabled': firewall_rule['enabled']}
return self._fields(res, fields)
def _check_firewall_rule_conflict(self, fwr_db, fwp_db):
if not fwr_db['shared']:
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
raise firewall.FirewallRuleConflict(
firewall_rule_id=fwr_db['id'],
tenant_id=fwr_db['tenant_id'])
def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
rule_id_list = fwp['firewall_rules']
fwp_db = firewall_policy_db
@ -196,6 +203,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
raise firewall.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
for fwr_db in rules_in_db:
self._check_firewall_rule_conflict(fwr_db, fwp_db)
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret
@ -403,6 +412,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
def update_firewall_rule(self, context, id, firewall_rule):
LOG.debug(_("update_firewall_rule() called"))
fwr = firewall_rule['firewall_rule']
fwr_db = self._get_firewall_rule(context, id)
if fwr_db.firewall_policy_id:
fwp_db = self._get_firewall_policy(context,
fwr_db.firewall_policy_id)
if 'shared' in fwr and not fwr['shared']:
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
raise firewall.FirewallRuleInUse(firewall_rule_id=id)
if 'source_port' in fwr:
src_port_min, src_port_max = self._get_min_max_ports_from_range(
fwr['source_port'])
@ -416,7 +432,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
fwr['destination_port_range_max'] = dst_port_max
del fwr['destination_port']
with context.session.begin(subtransactions=True):
fwr_db = self._get_firewall_rule(context, id)
protocol = fwr.get('protocol', fwr_db['protocol'])
if not protocol:
sport = fwr.get('source_port_range_min',
@ -427,8 +442,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
fwr_db.update(fwr)
if fwr_db.firewall_policy_id:
fwp_db = self._get_firewall_policy(context,
fwr_db.firewall_policy_id)
fwp_db.audited = False
return self._make_firewall_rule_dict(fwr_db)
@ -476,8 +489,10 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
insert_before = False
with context.session.begin(subtransactions=True):
fwr_db = self._get_firewall_rule(context, firewall_rule_id)
fwp_db = self._get_firewall_policy(context, id)
if fwr_db.firewall_policy_id:
raise firewall.FirewallRuleInUse(firewall_rule_id=fwr_db['id'])
self._check_firewall_rule_conflict(fwr_db, fwp_db)
if ref_firewall_rule_id:
# If reference_firewall_rule_id is set, the new rule
# is inserted depending on the value of insert_before.

View File

@ -127,6 +127,19 @@ class FirewallInternalDriverError(qexception.NeutronException):
message = _("%(driver)s: Internal driver error.")
class FirewallRuleConflict(qexception.Conflict):
"""Firewall rule conflict exception.
Occurs when admin policy tries to use another tenant's unshared
rule.
"""
message = _("Operation cannot be performed since Firewall Rule "
"%(firewall_rule_id)s is not shared and belongs to "
"another tenant %(tenant_id)s")
fw_valid_protocol_values = [None, constants.TCP, constants.UDP, constants.ICMP]
fw_valid_action_values = [constants.FWAAS_ALLOW, constants.FWAAS_DENY]

View File

@ -335,6 +335,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
for k, v in attrs.iteritems():
self.assertEqual(fwp['firewall_policy'][k], v)
def test_create_admin_firewall_policy_with_other_tenant_rules(self):
with self.firewall_rule(shared=False) as fr:
fw_rule_ids = [fr['firewall_rule']['id']]
res = self._create_firewall_policy(None, 'firewall_policy1',
description=DESCRIPTION,
shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED,
tenant_id='admin-tenant')
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_firewall_policy_with_previously_associated_rule(self):
with self.firewall_rule() as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
@ -834,6 +845,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
[fwr_id])
self.assertEqual(res['firewall_policy']['audited'], False)
def test_update_firewall_rule_associated_with_other_tenant_policy(self):
with self.firewall_rule(shared=True, tenant_id='tenant1') as fwr:
fwr_id = [fwr['firewall_rule']['id']]
with self.firewall_policy(shared=False,
firewall_rules=fwr_id):
data = {'firewall_rule': {'shared': False}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_delete_firewall_rule(self):
ctx = context.get_admin_context()
with self.firewall_rule(do_delete=False) as fwr:
@ -1057,6 +1079,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)
def test_insert_rule_for_policy_of_other_tenant(self):
with self.firewall_rule(tenant_id='tenant-2', shared=False) as fwr:
fwr_id = fwr['firewall_rule']['id']
with self.firewall_policy(name='firewall_policy') as fwp:
fwp_id = fwp['firewall_policy']['id']
insert_data = {'firewall_rule_id': fwr_id}
self._rule_action(
'insert', fwp_id, fwr_id, insert_before=None,
insert_after=None,
expected_code=webob.exc.HTTPConflict.code,
expected_body=None, body_data=insert_data)
def test_insert_rule_in_policy(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False