c6ec44949e
Closes-Bug: #1209070 This commit addresses H102 and H103 hacking check. H102 Apache 2.0 license header not found H103 Header does not match Apache 2.0 License notice Also ignores H233 hacking check in tox.ini. H233 Python 3.x incompatible use of print operator Change-Id: I081964bf0b9e21614da2f146bd3ba8d28e211024
967 lines
47 KiB
Python
967 lines
47 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
#
|
|
# Copyright 2013 Big Switch Networks, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
|
|
|
|
import contextlib
|
|
import logging
|
|
|
|
import webob.exc
|
|
|
|
from neutron.api import extensions as api_ext
|
|
from neutron.common import config
|
|
from neutron import context
|
|
from neutron.db.firewall import firewall_db as fdb
|
|
import neutron.extensions
|
|
from neutron.extensions import firewall
|
|
from neutron.openstack.common import importutils
|
|
from neutron.plugins.common import constants
|
|
from neutron.tests.unit import test_db_plugin
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
DB_FW_PLUGIN_KLASS = (
|
|
"neutron.db.firewall.firewall_db.Firewall_db_mixin"
|
|
)
|
|
extensions_path = ':'.join(neutron.extensions.__path__)
|
|
DESCRIPTION = 'default description'
|
|
SHARED = True
|
|
PROTOCOL = 'tcp'
|
|
IP_VERSION = 4
|
|
SOURCE_IP_ADDRESS_RAW = '1.1.1.1'
|
|
DESTINATION_IP_ADDRESS_RAW = '2.2.2.2'
|
|
SOURCE_PORT = '55000:56000'
|
|
DESTINATION_PORT = '56000:57000'
|
|
ACTION = 'allow'
|
|
AUDITED = True
|
|
ENABLED = True
|
|
ADMIN_STATE_UP = True
|
|
|
|
|
|
class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
|
resource_prefix_map = dict(
|
|
(k, constants.COMMON_PREFIXES[constants.FIREWALL])
|
|
for k in firewall.RESOURCE_ATTRIBUTE_MAP.keys()
|
|
)
|
|
|
|
def setUp(self, core_plugin=None, fw_plugin=None):
|
|
if not fw_plugin:
|
|
fw_plugin = DB_FW_PLUGIN_KLASS
|
|
service_plugins = {'fw_plugin_name': fw_plugin}
|
|
|
|
fdb.Firewall_db_mixin.supported_extension_aliases = ["fwaas"]
|
|
super(FirewallPluginDbTestCase, self).setUp(
|
|
service_plugins=service_plugins
|
|
)
|
|
|
|
self.plugin = importutils.import_object(fw_plugin)
|
|
ext_mgr = api_ext.PluginAwareExtensionManager(
|
|
extensions_path,
|
|
{constants.FIREWALL: self.plugin}
|
|
)
|
|
app = config.load_paste_app('extensions_test_app')
|
|
self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr)
|
|
|
|
def _test_list_resources(self, resource, items,
|
|
neutron_context=None,
|
|
query_params=None):
|
|
if resource.endswith('y'):
|
|
resource_plural = resource.replace('y', 'ies')
|
|
else:
|
|
resource_plural = resource + 's'
|
|
|
|
res = self._list(resource_plural,
|
|
neutron_context=neutron_context,
|
|
query_params=query_params)
|
|
resource = resource.replace('-', '_')
|
|
self.assertEqual(sorted([i['id'] for i in res[resource_plural]]),
|
|
sorted([i[resource]['id'] for i in items]))
|
|
|
|
def _get_test_firewall_rule_attrs(self, name='firewall_rule1'):
|
|
attrs = {'name': name,
|
|
'tenant_id': self._tenant_id,
|
|
'shared': SHARED,
|
|
'protocol': PROTOCOL,
|
|
'ip_version': IP_VERSION,
|
|
'source_ip_address': SOURCE_IP_ADDRESS_RAW,
|
|
'destination_ip_address': DESTINATION_IP_ADDRESS_RAW,
|
|
'source_port': SOURCE_PORT,
|
|
'destination_port': DESTINATION_PORT,
|
|
'action': ACTION,
|
|
'enabled': ENABLED}
|
|
return attrs
|
|
|
|
def _get_test_firewall_policy_attrs(self, name='firewall_policy1'):
|
|
attrs = {'name': name,
|
|
'description': DESCRIPTION,
|
|
'tenant_id': self._tenant_id,
|
|
'shared': SHARED,
|
|
'firewall_rules': [],
|
|
'audited': AUDITED}
|
|
return attrs
|
|
|
|
def _get_test_firewall_attrs(self, name='firewall_1'):
|
|
attrs = {'name': name,
|
|
'tenant_id': self._tenant_id,
|
|
'admin_state_up': ADMIN_STATE_UP,
|
|
'status': 'PENDING_CREATE'}
|
|
|
|
return attrs
|
|
|
|
def _create_firewall_policy(self, fmt, name, description, shared,
|
|
firewall_rules, audited,
|
|
expected_res_status=None, **kwargs):
|
|
data = {'firewall_policy': {'name': name,
|
|
'description': description,
|
|
'tenant_id': self._tenant_id,
|
|
'shared': shared,
|
|
'firewall_rules': firewall_rules,
|
|
'audited': audited}}
|
|
|
|
fw_policy_req = self.new_create_request('firewall_policies', data, fmt)
|
|
fw_policy_res = fw_policy_req.get_response(self.ext_api)
|
|
if expected_res_status:
|
|
self.assertEqual(fw_policy_res.status_int, expected_res_status)
|
|
|
|
return fw_policy_res
|
|
|
|
def _replace_firewall_status(self, attrs, old_status, new_status):
|
|
if attrs['status'] is old_status:
|
|
attrs['status'] = new_status
|
|
return attrs
|
|
|
|
@contextlib.contextmanager
|
|
def firewall_policy(self, fmt=None, name='firewall_policy1',
|
|
description=DESCRIPTION, shared=True,
|
|
firewall_rules=None, audited=True,
|
|
no_delete=False, **kwargs):
|
|
if firewall_rules is None:
|
|
firewall_rules = []
|
|
if not fmt:
|
|
fmt = self.fmt
|
|
res = self._create_firewall_policy(fmt, name, description, shared,
|
|
firewall_rules, audited,
|
|
**kwargs)
|
|
if res.status_int >= 400:
|
|
raise webob.exc.HTTPClientError(code=res.status_int)
|
|
firewall_policy = self.deserialize(fmt or self.fmt, res)
|
|
try:
|
|
yield firewall_policy
|
|
finally:
|
|
if not no_delete:
|
|
self._delete('firewall_policies',
|
|
firewall_policy['firewall_policy']['id'])
|
|
|
|
def _create_firewall_rule(self, fmt, name, shared, protocol,
|
|
ip_version, source_ip_address,
|
|
destination_ip_address, source_port,
|
|
destination_port, action, enabled,
|
|
expected_res_status=None, **kwargs):
|
|
data = {'firewall_rule': {'name': name,
|
|
'tenant_id': self._tenant_id,
|
|
'shared': shared,
|
|
'protocol': protocol,
|
|
'ip_version': ip_version,
|
|
'source_ip_address': source_ip_address,
|
|
'destination_ip_address':
|
|
destination_ip_address,
|
|
'source_port': source_port,
|
|
'destination_port': destination_port,
|
|
'action': action,
|
|
'enabled': enabled}}
|
|
|
|
fw_rule_req = self.new_create_request('firewall_rules', data, fmt)
|
|
fw_rule_res = fw_rule_req.get_response(self.ext_api)
|
|
if expected_res_status:
|
|
self.assertEqual(fw_rule_res.status_int, expected_res_status)
|
|
|
|
return fw_rule_res
|
|
|
|
@contextlib.contextmanager
|
|
def firewall_rule(self, fmt=None, name='firewall_rule1',
|
|
shared=SHARED, protocol=PROTOCOL, ip_version=IP_VERSION,
|
|
source_ip_address=SOURCE_IP_ADDRESS_RAW,
|
|
destination_ip_address=DESTINATION_IP_ADDRESS_RAW,
|
|
source_port=SOURCE_PORT,
|
|
destination_port=DESTINATION_PORT,
|
|
action=ACTION, enabled=ENABLED,
|
|
no_delete=False, **kwargs):
|
|
if not fmt:
|
|
fmt = self.fmt
|
|
res = self._create_firewall_rule(fmt, name, shared, protocol,
|
|
ip_version, source_ip_address,
|
|
destination_ip_address,
|
|
source_port, destination_port,
|
|
action, enabled, **kwargs)
|
|
if res.status_int >= 400:
|
|
raise webob.exc.HTTPClientError(code=res.status_int)
|
|
firewall_rule = self.deserialize(fmt or self.fmt, res)
|
|
try:
|
|
yield firewall_rule
|
|
finally:
|
|
if not no_delete:
|
|
self._delete('firewall_rules',
|
|
firewall_rule['firewall_rule']['id'])
|
|
|
|
def _create_firewall(self, fmt, name, description, firewall_policy_id,
|
|
admin_state_up=True, expected_res_status=None,
|
|
**kwargs):
|
|
data = {'firewall': {'name': name,
|
|
'description': description,
|
|
'firewall_policy_id': firewall_policy_id,
|
|
'admin_state_up': admin_state_up,
|
|
'tenant_id': self._tenant_id}}
|
|
|
|
firewall_req = self.new_create_request('firewalls', data, fmt)
|
|
firewall_res = firewall_req.get_response(self.ext_api)
|
|
if expected_res_status:
|
|
self.assertEqual(firewall_res.status_int, expected_res_status)
|
|
|
|
return firewall_res
|
|
|
|
@contextlib.contextmanager
|
|
def firewall(self, fmt=None, name='firewall_1', description=DESCRIPTION,
|
|
firewall_policy_id=None, admin_state_up=True,
|
|
no_delete=False, **kwargs):
|
|
if not fmt:
|
|
fmt = self.fmt
|
|
res = self._create_firewall(fmt, name, description, firewall_policy_id,
|
|
admin_state_up, **kwargs)
|
|
if res.status_int >= 400:
|
|
raise webob.exc.HTTPClientError(code=res.status_int)
|
|
firewall = self.deserialize(fmt or self.fmt, res)
|
|
try:
|
|
yield firewall
|
|
finally:
|
|
if not no_delete:
|
|
self._delete('firewalls', firewall['firewall']['id'])
|
|
|
|
def _rule_action(self, action, id, firewall_rule_id, insert_before=None,
|
|
insert_after=None, expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=None, body_data=None):
|
|
# We intentionally do this check for None since we want to distinguish
|
|
# from empty dictionary
|
|
if body_data is None:
|
|
if action == 'insert':
|
|
body_data = {'firewall_rule_id': firewall_rule_id,
|
|
'insert_before': insert_before,
|
|
'insert_after': insert_after}
|
|
else:
|
|
body_data = {'firewall_rule_id': firewall_rule_id}
|
|
|
|
req = self.new_action_request('firewall_policies',
|
|
body_data, id,
|
|
"%s_rule" % action)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, expected_code)
|
|
response = self.deserialize(self.fmt, res)
|
|
if expected_body:
|
|
self.assertEqual(response, expected_body)
|
|
return response
|
|
|
|
def _compare_firewall_rule_lists(self, firewall_policy_id,
|
|
list1, list2):
|
|
position = 0
|
|
for r1, r2 in zip(list1, list2):
|
|
rule = r1['firewall_rule']
|
|
rule['firewall_policy_id'] = firewall_policy_id
|
|
position += 1
|
|
rule['position'] = position
|
|
for k in rule:
|
|
self.assertEqual(rule[k], r2[k])
|
|
|
|
|
|
class TestFirewallDBPlugin(FirewallPluginDbTestCase):
|
|
|
|
def test_create_firewall_policy(self):
|
|
name = "firewall_policy1"
|
|
attrs = self._get_test_firewall_policy_attrs(name)
|
|
|
|
with self.firewall_policy(name=name, shared=SHARED,
|
|
firewall_rules=None,
|
|
audited=AUDITED) as firewall_policy:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall_policy['firewall_policy'][k], v)
|
|
|
|
def test_create_firewall_policy_with_rules(self):
|
|
name = "firewall_policy1"
|
|
attrs = self._get_test_firewall_policy_attrs(name)
|
|
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr3',
|
|
no_delete=True)) as fr:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
|
|
attrs['firewall_rules'] = fw_rule_ids
|
|
with self.firewall_policy(name=name, shared=SHARED,
|
|
firewall_rules=fw_rule_ids,
|
|
audited=AUDITED,
|
|
no_delete=True) as fwp:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(fwp['firewall_policy'][k], v)
|
|
|
|
def test_show_firewall_policy(self):
|
|
name = "firewall_policy1"
|
|
attrs = self._get_test_firewall_policy_attrs(name)
|
|
|
|
with self.firewall_policy(name=name, shared=SHARED,
|
|
firewall_rules=None,
|
|
audited=AUDITED) as fwp:
|
|
req = self.new_show_request('firewall_policies',
|
|
fwp['firewall_policy']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_policy'][k], v)
|
|
|
|
def test_list_firewall_policies(self):
|
|
with contextlib.nested(self.firewall_policy(name='fwp1',
|
|
description='fwp'),
|
|
self.firewall_policy(name='fwp2',
|
|
description='fwp'),
|
|
self.firewall_policy(name='fwp3',
|
|
description='fwp')
|
|
) as fw_policies:
|
|
self._test_list_resources('firewall_policy',
|
|
fw_policies,
|
|
query_params='description=fwp')
|
|
|
|
def test_update_firewall_policy(self):
|
|
name = "new_firewall_policy1"
|
|
attrs = self._get_test_firewall_policy_attrs(name)
|
|
|
|
with self.firewall_policy(shared=SHARED,
|
|
firewall_rules=None,
|
|
audited=AUDITED) as fwp:
|
|
data = {'firewall_policy': {'name': name}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_policy'][k], v)
|
|
|
|
def test_update_firewall_policy_with_rules(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
|
|
with self.firewall_policy() as fwp:
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr3',
|
|
no_delete=True)) as fr:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
|
|
attrs['firewall_rules'] = fw_rule_ids
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
attrs['audited'] = False
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_policy'][k], v)
|
|
|
|
def test_update_firewall_policy_replace_rules(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
|
|
with self.firewall_policy() as fwp:
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True)) as fr1:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
req.get_response(self.ext_api)
|
|
with contextlib.nested(self.firewall_rule(name='fwr3',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr4',
|
|
no_delete=True)) as fr2:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr2]
|
|
attrs['firewall_rules'] = fw_rule_ids
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
attrs['audited'] = False
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_policy'][k], v)
|
|
|
|
def test_update_firewall_policy_with_non_existing_rule(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
|
|
with self.firewall_policy() as fwp:
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True)) as fr:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
|
|
fw_rule_ids.append('12345') # non-existent rule
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
res = req.get_response(self.ext_api)
|
|
#check that the firewall_rule was not found
|
|
self.assertEqual(res.status_int, 404)
|
|
#check if none of the rules got added to the policy
|
|
req = self.new_show_request('firewall_policies',
|
|
fwp['firewall_policy']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_policy'][k], v)
|
|
|
|
def test_delete_firewall_policy(self):
|
|
ctx = context.get_admin_context()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
req = self.new_delete_request('firewall_policies', fwp_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 204)
|
|
self.assertRaises(firewall.FirewallPolicyNotFound,
|
|
self.plugin.get_firewall_policy,
|
|
ctx, fwp_id)
|
|
|
|
def test_delete_firewall_policy_with_rule(self):
|
|
ctx = context.get_admin_context()
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
with self.firewall_rule(name='fwr1', no_delete=True) as fr:
|
|
fr_id = fr['firewall_rule']['id']
|
|
fw_rule_ids = [fr_id]
|
|
attrs['firewall_rules'] = fw_rule_ids
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
req.get_response(self.ext_api)
|
|
fw_rule = self.plugin.get_firewall_rule(ctx, fr_id)
|
|
self.assertEqual(fw_rule['firewall_policy_id'], fwp_id)
|
|
req = self.new_delete_request('firewall_policies', fwp_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 204)
|
|
self.assertRaises(firewall.FirewallPolicyNotFound,
|
|
self.plugin.get_firewall_policy,
|
|
ctx, fwp_id)
|
|
fw_rule = self.plugin.get_firewall_rule(ctx, fr_id)
|
|
self.assertEqual(fw_rule['firewall_policy_id'], None)
|
|
|
|
def test_delete_firewall_policy_with_firewall_association(self):
|
|
attrs = self._get_test_firewall_attrs()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall(firewall_policy_id=fwp_id,
|
|
admin_state_up=
|
|
ADMIN_STATE_UP):
|
|
req = self.new_delete_request('firewall_policies', fwp_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 409)
|
|
|
|
def test_create_firewall_rule(self):
|
|
attrs = self._get_test_firewall_rule_attrs()
|
|
|
|
with self.firewall_rule() as firewall_rule:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall_rule['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = None
|
|
attrs['destination_port'] = None
|
|
with self.firewall_rule(source_port=None,
|
|
destination_port=None) as firewall_rule:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall_rule['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = '10000'
|
|
attrs['destination_port'] = '80'
|
|
with self.firewall_rule(source_port=10000,
|
|
destination_port=80) as firewall_rule:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall_rule['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = '10000'
|
|
attrs['destination_port'] = '80'
|
|
with self.firewall_rule(source_port='10000',
|
|
destination_port='80') as firewall_rule:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall_rule['firewall_rule'][k], v)
|
|
|
|
def test_show_firewall_rule_with_fw_policy_not_associated(self):
|
|
attrs = self._get_test_firewall_rule_attrs()
|
|
with self.firewall_rule() as fw_rule:
|
|
req = self.new_show_request('firewall_rules',
|
|
fw_rule['firewall_rule']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
def test_show_firewall_rule_with_fw_policy_associated(self):
|
|
attrs = self._get_test_firewall_rule_attrs()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall_rule(no_delete=True) as fw_rule:
|
|
data = {'firewall_policy':
|
|
{'firewall_rules':
|
|
[fw_rule['firewall_rule']['id']]}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
req.get_response(self.ext_api)
|
|
req = self.new_show_request('firewall_rules',
|
|
fw_rule['firewall_rule']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
def test_list_firewall_rules(self):
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr3',
|
|
no_delete=True)) as fr:
|
|
query_params = 'protocol=tcp'
|
|
self._test_list_resources('firewall_rule', fr,
|
|
query_params=query_params)
|
|
|
|
def test_update_firewall_rule(self):
|
|
name = "new_firewall_rule1"
|
|
attrs = self._get_test_firewall_rule_attrs(name)
|
|
|
|
attrs['source_port'] = '10:20'
|
|
attrs['destination_port'] = '30:40'
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
data = {'firewall_rule': {'name': name,
|
|
'source_port': '10:20',
|
|
'destination_port': '30:40'}}
|
|
req = self.new_update_request('firewall_rules', data,
|
|
fwr['firewall_rule']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = '10000'
|
|
attrs['destination_port'] = '80'
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
data = {'firewall_rule': {'name': name,
|
|
'source_port': 10000,
|
|
'destination_port': 80}}
|
|
req = self.new_update_request('firewall_rules', data,
|
|
fwr['firewall_rule']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = '10000'
|
|
attrs['destination_port'] = '80'
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
data = {'firewall_rule': {'name': name,
|
|
'source_port': '10000',
|
|
'destination_port': '80'}}
|
|
req = self.new_update_request('firewall_rules', data,
|
|
fwr['firewall_rule']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
attrs['source_port'] = None
|
|
attrs['destination_port'] = None
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
data = {'firewall_rule': {'name': name,
|
|
'source_port': None,
|
|
'destination_port': None}}
|
|
req = self.new_update_request('firewall_rules', data,
|
|
fwr['firewall_rule']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
|
|
def test_update_firewall_rule_with_policy_associated(self):
|
|
name = "new_firewall_rule1"
|
|
attrs = self._get_test_firewall_rule_attrs(name)
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
fwr_id = fwr['firewall_rule']['id']
|
|
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
req.get_response(self.ext_api)
|
|
data = {'firewall_rule': {'name': name}}
|
|
req = self.new_update_request('firewall_rules', data,
|
|
fwr['firewall_rule']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall_rule'][k], v)
|
|
req = self.new_show_request('firewall_policies',
|
|
fwp['firewall_policy']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
self.assertEqual(res['firewall_policy']['firewall_rules'],
|
|
[fwr_id])
|
|
self.assertEqual(res['firewall_policy']['audited'], False)
|
|
|
|
def test_delete_firewall_rule(self):
|
|
ctx = context.get_admin_context()
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
fwr_id = fwr['firewall_rule']['id']
|
|
req = self.new_delete_request('firewall_rules', fwr_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 204)
|
|
self.assertRaises(firewall.FirewallRuleNotFound,
|
|
self.plugin.get_firewall_rule,
|
|
ctx, fwr_id)
|
|
|
|
def test_delete_firewall_rule_with_policy_associated(self):
|
|
attrs = self._get_test_firewall_rule_attrs()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall_rule(no_delete=True) as fwr:
|
|
fwr_id = fwr['firewall_rule']['id']
|
|
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp['firewall_policy']['id'])
|
|
req.get_response(self.ext_api)
|
|
req = self.new_delete_request('firewall_rules', fwr_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 409)
|
|
|
|
def test_create_firewall(self):
|
|
name = "firewall1"
|
|
attrs = self._get_test_firewall_attrs(name)
|
|
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall(name=name,
|
|
firewall_policy_id=fwp_id,
|
|
admin_state_up=
|
|
ADMIN_STATE_UP) as firewall:
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(firewall['firewall'][k], v)
|
|
|
|
def test_show_firewall(self):
|
|
name = "firewall1"
|
|
attrs = self._get_test_firewall_attrs(name)
|
|
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall(name=name,
|
|
firewall_policy_id=fwp_id,
|
|
admin_state_up=
|
|
ADMIN_STATE_UP) as firewall:
|
|
req = self.new_show_request('firewalls',
|
|
firewall['firewall']['id'],
|
|
fmt=self.fmt)
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall'][k], v)
|
|
|
|
def test_list_firewalls(self):
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
with contextlib.nested(self.firewall(name='fw1',
|
|
firewall_policy_id=fwp_id,
|
|
description='fw'),
|
|
self.firewall(name='fw2',
|
|
firewall_policy_id=fwp_id,
|
|
description='fw'),
|
|
self.firewall(name='fw3',
|
|
firewall_policy_id=fwp_id,
|
|
description='fw')) as fwalls:
|
|
self._test_list_resources('firewall', fwalls,
|
|
query_params='description=fw')
|
|
|
|
def test_update_firewall(self):
|
|
name = "new_firewall1"
|
|
attrs = self._get_test_firewall_attrs(name)
|
|
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['firewall_policy_id'] = fwp_id
|
|
with self.firewall(firewall_policy_id=fwp_id,
|
|
admin_state_up=
|
|
ADMIN_STATE_UP) as firewall:
|
|
data = {'firewall': {'name': name}}
|
|
req = self.new_update_request('firewalls', data,
|
|
firewall['firewall']['id'])
|
|
res = self.deserialize(self.fmt,
|
|
req.get_response(self.ext_api))
|
|
for k, v in attrs.iteritems():
|
|
self.assertEqual(res['firewall'][k], v)
|
|
|
|
def test_delete_firewall(self):
|
|
ctx = context.get_admin_context()
|
|
with self.firewall_policy(no_delete=True) as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
with self.firewall(firewall_policy_id=fwp_id,
|
|
no_delete=True) as fw:
|
|
fw_id = fw['firewall']['id']
|
|
req = self.new_delete_request('firewalls', fw_id)
|
|
res = req.get_response(self.ext_api)
|
|
self.assertEqual(res.status_int, 204)
|
|
self.assertRaises(firewall.FirewallNotFound,
|
|
self.plugin.get_firewall,
|
|
ctx, fw_id)
|
|
|
|
def test_insert_rule_in_policy_with_prior_rules_added_via_update(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
attrs['audited'] = False
|
|
attrs['firewall_list'] = []
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['id'] = fwp_id
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True)) as fr1:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
|
|
attrs['firewall_rules'] = fw_rule_ids[:]
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp_id)
|
|
req.get_response(self.ext_api)
|
|
self._rule_action('insert', fwp_id, fw_rule_ids[0],
|
|
insert_before=fw_rule_ids[0],
|
|
insert_after=None,
|
|
expected_code=webob.exc.HTTPConflict.code,
|
|
expected_body=None)
|
|
with self.firewall_rule(name='fwr3', no_delete=True) as fwr3:
|
|
fwr3_id = fwr3['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(0, fwr3_id)
|
|
self._rule_action('insert', fwp_id, fwr3_id,
|
|
insert_before=fw_rule_ids[0],
|
|
insert_after=None,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
|
|
def test_insert_rule_in_policy_failures(self):
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
|
|
fr1_id = fr1['firewall_rule']['id']
|
|
fw_rule_ids = [fr1_id]
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp_id)
|
|
req.get_response(self.ext_api)
|
|
# test inserting with empty request body
|
|
self._rule_action('insert', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPBadRequest.code,
|
|
expected_body=None, body_data={})
|
|
# test inserting when firewall_rule_id is missing in
|
|
# request body
|
|
insert_data = {'insert_before': '123',
|
|
'insert_after': '456'}
|
|
self._rule_action('insert', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPBadRequest.code,
|
|
expected_body=None,
|
|
body_data=insert_data)
|
|
# test inserting when firewall_rule_id is None
|
|
insert_data = {'firewall_rule_id': None,
|
|
'insert_before': '123',
|
|
'insert_after': '456'}
|
|
self._rule_action('insert', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPNotFound.code,
|
|
expected_body=None,
|
|
body_data=insert_data)
|
|
# test inserting when firewall_policy_id is incorrect
|
|
self._rule_action('insert', '123', fr1_id,
|
|
expected_code=webob.exc.HTTPNotFound.code,
|
|
expected_body=None)
|
|
# test inserting when firewall_policy_id is None
|
|
self._rule_action('insert', None, fr1_id,
|
|
expected_code=webob.exc.HTTPBadRequest.code,
|
|
expected_body=None)
|
|
|
|
def test_insert_rule_in_policy(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
attrs['audited'] = False
|
|
attrs['firewall_list'] = []
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['id'] = fwp_id
|
|
with contextlib.nested(self.firewall_rule(name='fwr0',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr3',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr4',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr5',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr6',
|
|
no_delete=True)) as fwr:
|
|
# test insert when rule list is empty
|
|
fwr0_id = fwr[0]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(0, fwr0_id)
|
|
self._rule_action('insert', fwp_id, fwr0_id,
|
|
insert_before=None,
|
|
insert_after=None,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
# test insert at top of rule list, insert_before and
|
|
# insert_after not provided
|
|
fwr1_id = fwr[1]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(0, fwr1_id)
|
|
insert_data = {'firewall_rule_id': fwr1_id}
|
|
self._rule_action('insert', fwp_id, fwr0_id,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs, body_data=insert_data)
|
|
# test insert at top of list above existing rule
|
|
fwr2_id = fwr[2]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(0, fwr2_id)
|
|
self._rule_action('insert', fwp_id, fwr2_id,
|
|
insert_before=fwr1_id,
|
|
insert_after=None,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
# test insert at bottom of list
|
|
fwr3_id = fwr[3]['firewall_rule']['id']
|
|
attrs['firewall_rules'].append(fwr3_id)
|
|
self._rule_action('insert', fwp_id, fwr3_id,
|
|
insert_before=None,
|
|
insert_after=fwr0_id,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
# test insert in the middle of the list using
|
|
# insert_before
|
|
fwr4_id = fwr[4]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(1, fwr4_id)
|
|
self._rule_action('insert', fwp_id, fwr4_id,
|
|
insert_before=fwr1_id,
|
|
insert_after=None,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
# test insert in the middle of the list using
|
|
# insert_after
|
|
fwr5_id = fwr[5]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(1, fwr5_id)
|
|
self._rule_action('insert', fwp_id, fwr5_id,
|
|
insert_before=None,
|
|
insert_after=fwr2_id,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
# test insert when both insert_before and
|
|
# insert_after are set
|
|
fwr6_id = fwr[6]['firewall_rule']['id']
|
|
attrs['firewall_rules'].insert(1, fwr6_id)
|
|
self._rule_action('insert', fwp_id, fwr6_id,
|
|
insert_before=fwr5_id,
|
|
insert_after=fwr5_id,
|
|
expected_code=webob.exc.HTTPOk.code,
|
|
expected_body=attrs)
|
|
|
|
def test_remove_rule_from_policy(self):
|
|
attrs = self._get_test_firewall_policy_attrs()
|
|
attrs['audited'] = False
|
|
attrs['firewall_list'] = []
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
attrs['id'] = fwp_id
|
|
with contextlib.nested(self.firewall_rule(name='fwr1',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr2',
|
|
no_delete=True),
|
|
self.firewall_rule(name='fwr3',
|
|
no_delete=True)) as fr1:
|
|
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
|
|
attrs['firewall_rules'] = fw_rule_ids[:]
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp_id)
|
|
req.get_response(self.ext_api)
|
|
# test removing a rule from a policy that does not exist
|
|
self._rule_action('remove', '123', fw_rule_ids[1],
|
|
expected_code=webob.exc.HTTPNotFound.code,
|
|
expected_body=None)
|
|
# test removing a rule in the middle of the list
|
|
attrs['firewall_rules'].remove(fw_rule_ids[1])
|
|
self._rule_action('remove', fwp_id, fw_rule_ids[1],
|
|
expected_body=attrs)
|
|
# test removing a rule at the top of the list
|
|
attrs['firewall_rules'].remove(fw_rule_ids[0])
|
|
self._rule_action('remove', fwp_id, fw_rule_ids[0],
|
|
expected_body=attrs)
|
|
# test removing remaining rule in the list
|
|
attrs['firewall_rules'].remove(fw_rule_ids[2])
|
|
self._rule_action('remove', fwp_id, fw_rule_ids[2],
|
|
expected_body=attrs)
|
|
# test removing rule that is not associated with the policy
|
|
self._rule_action('remove', fwp_id, fw_rule_ids[2],
|
|
expected_code=webob.exc.HTTPBadRequest.code,
|
|
expected_body=None)
|
|
|
|
def test_remove_rule_from_policy_failures(self):
|
|
with self.firewall_policy() as fwp:
|
|
fwp_id = fwp['firewall_policy']['id']
|
|
with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
|
|
fw_rule_ids = [fr1['firewall_rule']['id']]
|
|
data = {'firewall_policy':
|
|
{'firewall_rules': fw_rule_ids}}
|
|
req = self.new_update_request('firewall_policies', data,
|
|
fwp_id)
|
|
req.get_response(self.ext_api)
|
|
# test removing rule that does not exist
|
|
self._rule_action('remove', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPNotFound.code,
|
|
expected_body=None)
|
|
# test removing rule with bad request
|
|
self._rule_action('remove', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPBadRequest.code,
|
|
expected_body=None, body_data={})
|
|
# test removing rule with firewall_rule_id set to None
|
|
self._rule_action('remove', fwp_id, '123',
|
|
expected_code=webob.exc.HTTPNotFound.code,
|
|
expected_body=None,
|
|
body_data={'firewall_rule_id': None})
|
|
|
|
|
|
class TestFirewallDBPluginXML(TestFirewallDBPlugin):
|
|
fmt = 'xml'
|