Merge "Stop admin using other tenants unshared rules"

This commit is contained in:
Jenkins 2014-09-26 23:02:22 +00:00 committed by Gerrit Code Review
commit 83ad9a5cbb
3 changed files with 65 additions and 3 deletions

View File

@ -162,6 +162,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
'enabled': firewall_rule['enabled']}
return self._fields(res, fields)
def _check_firewall_rule_conflict(self, fwr_db, fwp_db):
if not fwr_db['shared']:
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
raise firewall.FirewallRuleConflict(
firewall_rule_id=fwr_db['id'],
tenant_id=fwr_db['tenant_id'])
def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
rule_id_list = fwp['firewall_rules']
fwp_db = firewall_policy_db
@ -196,6 +203,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
raise firewall.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
for fwr_db in rules_in_db:
self._check_firewall_rule_conflict(fwr_db, fwp_db)
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret
@ -403,6 +412,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
def update_firewall_rule(self, context, id, firewall_rule):
LOG.debug(_("update_firewall_rule() called"))
fwr = firewall_rule['firewall_rule']
fwr_db = self._get_firewall_rule(context, id)
if fwr_db.firewall_policy_id:
fwp_db = self._get_firewall_policy(context,
fwr_db.firewall_policy_id)
if 'shared' in fwr and not fwr['shared']:
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
raise firewall.FirewallRuleInUse(firewall_rule_id=id)
if 'source_port' in fwr:
src_port_min, src_port_max = self._get_min_max_ports_from_range(
fwr['source_port'])
@ -416,7 +432,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
fwr['destination_port_range_max'] = dst_port_max
del fwr['destination_port']
with context.session.begin(subtransactions=True):
fwr_db = self._get_firewall_rule(context, id)
protocol = fwr.get('protocol', fwr_db['protocol'])
if not protocol:
sport = fwr.get('source_port_range_min',
@ -427,8 +442,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
fwr_db.update(fwr)
if fwr_db.firewall_policy_id:
fwp_db = self._get_firewall_policy(context,
fwr_db.firewall_policy_id)
fwp_db.audited = False
return self._make_firewall_rule_dict(fwr_db)
@ -476,8 +489,10 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
insert_before = False
with context.session.begin(subtransactions=True):
fwr_db = self._get_firewall_rule(context, firewall_rule_id)
fwp_db = self._get_firewall_policy(context, id)
if fwr_db.firewall_policy_id:
raise firewall.FirewallRuleInUse(firewall_rule_id=fwr_db['id'])
self._check_firewall_rule_conflict(fwr_db, fwp_db)
if ref_firewall_rule_id:
# If reference_firewall_rule_id is set, the new rule
# is inserted depending on the value of insert_before.

View File

@ -127,6 +127,19 @@ class FirewallInternalDriverError(qexception.NeutronException):
message = _("%(driver)s: Internal driver error.")
class FirewallRuleConflict(qexception.Conflict):
"""Firewall rule conflict exception.
Occurs when admin policy tries to use another tenant's unshared
rule.
"""
message = _("Operation cannot be performed since Firewall Rule "
"%(firewall_rule_id)s is not shared and belongs to "
"another tenant %(tenant_id)s")
fw_valid_protocol_values = [None, constants.TCP, constants.UDP, constants.ICMP]
fw_valid_action_values = [constants.FWAAS_ALLOW, constants.FWAAS_DENY]

View File

@ -335,6 +335,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
for k, v in attrs.iteritems():
self.assertEqual(fwp['firewall_policy'][k], v)
def test_create_admin_firewall_policy_with_other_tenant_rules(self):
with self.firewall_rule(shared=False) as fr:
fw_rule_ids = [fr['firewall_rule']['id']]
res = self._create_firewall_policy(None, 'firewall_policy1',
description=DESCRIPTION,
shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED,
tenant_id='admin-tenant')
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_firewall_policy_with_previously_associated_rule(self):
with self.firewall_rule() as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
@ -834,6 +845,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
[fwr_id])
self.assertEqual(res['firewall_policy']['audited'], False)
def test_update_firewall_rule_associated_with_other_tenant_policy(self):
with self.firewall_rule(shared=True, tenant_id='tenant1') as fwr:
fwr_id = [fwr['firewall_rule']['id']]
with self.firewall_policy(shared=False,
firewall_rules=fwr_id):
data = {'firewall_rule': {'shared': False}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_delete_firewall_rule(self):
ctx = context.get_admin_context()
with self.firewall_rule(do_delete=False) as fwr:
@ -1057,6 +1079,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)
def test_insert_rule_for_policy_of_other_tenant(self):
with self.firewall_rule(tenant_id='tenant-2', shared=False) as fwr:
fwr_id = fwr['firewall_rule']['id']
with self.firewall_policy(name='firewall_policy') as fwp:
fwp_id = fwp['firewall_policy']['id']
insert_data = {'firewall_rule_id': fwr_id}
self._rule_action(
'insert', fwp_id, fwr_id, insert_before=None,
insert_after=None,
expected_code=webob.exc.HTTPConflict.code,
expected_body=None, body_data=insert_data)
def test_insert_rule_in_policy(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False