Mock agent RPC for FWaaS tests to delete DB objs

This changes the firewall service unit tests to
mock the RPC calls from the firewall service to
the agent. This allows the tests to fake the
agent response RPC that removes the firewall
from the DB so the all of the other objects that
the firewall depends on can be deleted.

Closes-Bug: #1288441
Change-Id: I32462ab5557b9c52328bf6a23a12efc2d799644c
This commit is contained in:
Kevin Benton 2014-03-04 03:27:11 -08:00 committed by Akihiro Motoki
parent ad57ebb143
commit 061f27b489
2 changed files with 162 additions and 170 deletions

View File

@ -20,6 +20,7 @@
import contextlib
import logging
import mock
import webob.exc
from neutron.api import extensions as api_ext
@ -28,9 +29,11 @@ from neutron import context
from neutron.db.firewall import firewall_db as fdb
import neutron.extensions
from neutron.extensions import firewall
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.services.firewall import fwaas_plugin
from neutron.tests.unit import test_db_plugin
@ -38,6 +41,8 @@ LOG = logging.getLogger(__name__)
DB_FW_PLUGIN_KLASS = (
"neutron.db.firewall.firewall_db.Firewall_db_mixin"
)
FWAAS_PLUGIN = 'neutron.services.firewall.fwaas_plugin'
DELETEFW_PATH = FWAAS_PLUGIN + '.FirewallAgentApi.delete_firewall'
extensions_path = ':'.join(neutron.extensions.__path__)
DESCRIPTION = 'default description'
SHARED = True
@ -53,6 +58,23 @@ ENABLED = True
ADMIN_STATE_UP = True
class FakeAgentApi(fwaas_plugin.FirewallCallbacks):
"""
This class used to mock the AgentAPI delete method inherits from
FirewallCallbacks because it needs access to the firewall_deleted method.
The delete_firewall method belongs to the FirewallAgentApi, which has
no access to the firewall_deleted method normally because it's not
responsible for deleting the firewall from the DB. However, it needs
to in the unit tests since there is no agent to call back.
"""
def __init__(self):
pass
def delete_firewall(self, context, firewall, **kwargs):
self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL']
self.firewall_deleted(context, firewall['id'], **kwargs)
class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
resource_prefix_map = dict(
(k, constants.COMMON_PREFIXES[constants.FIREWALL])
@ -60,6 +82,9 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
)
def setUp(self, core_plugin=None, fw_plugin=None, ext_mgr=None):
self.agentapi_delf_p = mock.patch(DELETEFW_PATH, create=True,
new=FakeAgentApi().delete_firewall)
self.agentapi_delf_p.start()
if not fw_plugin:
fw_plugin = DB_FW_PLUGIN_KLASS
service_plugins = {'fw_plugin_name': fw_plugin}
@ -304,18 +329,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
attrs['firewall_rules'] = fw_rule_ids
with self.firewall_policy(name=name, shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED,
no_delete=True) as fwp:
audited=AUDITED) as fwp:
for k, v in attrs.iteritems():
self.assertEqual(fwp['firewall_policy'][k], v)
@ -372,13 +393,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_update_firewall_policy_with_rules(self):
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy() as fwp:
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr:
with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
attrs['firewall_rules'] = fw_rule_ids
data = {'firewall_policy':
@ -394,26 +412,25 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_update_firewall_policy_replace_rules(self):
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy() as fwp:
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True)) as fr1:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3'),
self.firewall_rule(name='fwr4')) as frs:
fr1 = frs[0:2]
fr2 = frs[2:4]
with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
req.get_response(self.ext_api)
with contextlib.nested(self.firewall_rule(name='fwr3',
no_delete=True),
self.firewall_rule(name='fwr4',
no_delete=True)) as fr2:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr2]
attrs['firewall_rules'] = fw_rule_ids
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
new_data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', new_data,
fwp['firewall_policy']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
@ -424,15 +441,11 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_update_firewall_policy_reorder_rules(self):
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy() as fwp:
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True),
self.firewall_rule(name='fwr4',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3'),
self.firewall_rule(name='fwr4')) as fr:
with self.firewall_policy() as fwp:
fw_rule_ids = [fr[2]['firewall_rule']['id'],
fr[3]['firewall_rule']['id']]
data = {'firewall_policy':
@ -472,11 +485,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_update_firewall_policy_with_non_existing_rule(self):
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy() as fwp:
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2')) as fr:
with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
# appending non-existent rule
fw_rule_ids.append(uuidutils.generate_uuid())
@ -512,7 +523,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_rule(name='fwr1', no_delete=True) as fr:
with self.firewall_rule(name='fwr1') as fr:
fr_id = fr['firewall_rule']['id']
fw_rule_ids = [fr_id]
attrs['firewall_rules'] = fw_rule_ids
@ -534,7 +545,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_delete_firewall_policy_with_firewall_association(self):
attrs = self._get_test_firewall_attrs()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
@ -585,10 +596,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_show_firewall_rule_with_fw_policy_associated(self):
attrs = self._get_test_firewall_rule_attrs()
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall_rule(no_delete=True) as fw_rule:
with self.firewall_rule() as fw_rule:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
data = {'firewall_policy':
{'firewall_rules':
[fw_rule['firewall_rule']['id']]}}
@ -604,12 +615,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
self.assertEqual(res['firewall_rule'][k], v)
def test_list_firewall_rules(self):
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr:
query_params = 'protocol=tcp'
self._test_list_resources('firewall_rule', fr,
query_params=query_params)
@ -620,7 +628,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs['source_port'] = '10:20'
attrs['destination_port'] = '30:40'
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': '10:20',
'destination_port': '30:40'}}
@ -633,7 +641,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs['source_port'] = '10000'
attrs['destination_port'] = '80'
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': 10000,
'destination_port': 80}}
@ -646,7 +654,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs['source_port'] = '10000'
attrs['destination_port'] = '80'
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': '10000',
'destination_port': '80'}}
@ -659,7 +667,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs['source_port'] = None
attrs['destination_port'] = None
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': None,
'destination_port': None}}
@ -673,10 +681,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_update_firewall_rule_with_policy_associated(self):
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request('firewall_policies', data,
@ -712,10 +720,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_delete_firewall_rule_with_policy_associated(self):
attrs = self._get_test_firewall_rule_attrs()
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall_rule(no_delete=True) as fwr:
with self.firewall_rule() as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request('firewall_policies', data,
@ -729,7 +737,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
@ -743,7 +751,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
@ -759,7 +767,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
self.assertEqual(res['firewall'][k], v)
def test_list_firewalls(self):
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with contextlib.nested(self.firewall(name='fw1',
firewall_policy_id=fwp_id,
@ -793,7 +801,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
def test_delete_firewall(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
no_delete=True) as fw:
@ -809,13 +817,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True)) as fr1:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as frs:
fr1 = frs[0:2]
fwr3 = frs[2]
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
@ -828,19 +837,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
insert_after=None,
expected_code=webob.exc.HTTPConflict.code,
expected_body=None)
with self.firewall_rule(name='fwr3', no_delete=True) as fwr3:
fwr3_id = fwr3['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr3_id)
self._rule_action('insert', fwp_id, fwr3_id,
insert_before=fw_rule_ids[0],
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
fwr3_id = fwr3['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr3_id)
self._rule_action('insert', fwp_id, fwr3_id,
insert_before=fw_rule_ids[0],
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
def test_insert_rule_in_policy_failures(self):
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
with self.firewall_rule(name='fwr1') as fr1:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
fr1_id = fr1['firewall_rule']['id']
fw_rule_ids = [fr1_id]
data = {'firewall_policy':
@ -895,23 +903,16 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with contextlib.nested(self.firewall_rule(name='fwr0',
no_delete=True),
self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True),
self.firewall_rule(name='fwr4',
no_delete=True),
self.firewall_rule(name='fwr5',
no_delete=True),
self.firewall_rule(name='fwr6',
no_delete=True)) as fwr:
with contextlib.nested(self.firewall_rule(name='fwr0'),
self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3'),
self.firewall_rule(name='fwr4'),
self.firewall_rule(name='fwr5'),
self.firewall_rule(name='fwr6')) as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
# test insert when rule list is empty
fwr0_id = fwr[0]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr0_id)
@ -976,15 +977,12 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True)) as fr1:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr1:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
@ -1014,9 +1012,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
expected_body=None)
def test_remove_rule_from_policy_failures(self):
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
with self.firewall_rule(name='fwr1') as fr1:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [fr1['firewall_rule']['id']]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}

View File

@ -44,7 +44,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def test_set_firewall_status(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
@ -64,7 +64,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def test_firewall_deleted(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
@ -83,11 +83,12 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def test_firewall_deleted_error(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
with self.firewall(
firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.firewall_deleted(ctx, fw_id,
host='dummy')
@ -98,17 +99,15 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def test_get_firewall_for_tenant(self):
tenant_id = 'test-tenant'
ctx = context.Context('', tenant_id)
with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
with contextlib.nested(self.firewall_rule(name='fwr1',
tenant_id=tenant_id,
no_delete=True),
self.firewall_rule(name='fwr2',
tenant_id=tenant_id,
no_delete=True),
self.firewall_rule(name='fwr3',
tenant_id=tenant_id,
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1',
tenant_id=tenant_id),
self.firewall_rule(name='fwr2',
tenant_id=tenant_id),
self.firewall_rule(name='fwr3',
tenant_id=tenant_id)
) as fr:
with self.firewall_policy(tenant_id=tenant_id) as fwp:
fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
@ -120,8 +119,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
with self.firewall(firewall_policy_id=fwp_id,
tenant_id=tenant_id,
admin_state_up=
test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.get_firewalls_for_tenant(ctx,
host='dummy')
@ -136,13 +134,13 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def test_get_firewall_for_tenant_without_rules(self):
tenant_id = 'test-tenant'
ctx = context.Context('', tenant_id)
with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp:
with self.firewall_policy(tenant_id=tenant_id) as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
admin_state_up=test_db_firewall.ADMIN_STATE_UP
) as fw:
fw_list = [fw['firewall']]
f = self.callbacks.get_firewalls_for_tenant_without_rules
res = f(ctx, host='dummy')
@ -196,7 +194,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
self.callbacks = self.plugin.callbacks
def test_create_second_firewall_not_permitted(self):
with self.firewall(no_delete=True):
with self.firewall():
res = self._create_firewall(
None, 'firewall2', description='test',
firewall_policy_id=None, admin_state_up=True)
@ -207,7 +205,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
@ -230,7 +228,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
@ -246,7 +244,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
@ -259,9 +257,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
self.assertEqual(res.status_int, 409)
def test_update_firewall_rule_fails_when_firewall_pending(self):
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_rule(name='fwr1', no_delete=True) as fr:
with self.firewall_rule(name='fwr1') as fr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
fr_id = fr['firewall_rule']['id']
fw_rule_ids = [fr_id]
data = {'firewall_policy':
@ -271,8 +269,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
req.get_response(self.ext_api)
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
test_db_firewall.ADMIN_STATE_UP,
no_delete=True):
test_db_firewall.ADMIN_STATE_UP):
data = {'firewall_rule': {'protocol': 'udp'}}
req = self.new_update_request('firewall_rules',
data, fr_id)
@ -282,8 +279,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
def test_delete_firewall(self):
ctx = context.get_admin_context()
attrs = self._get_test_firewall_attrs()
with self.firewall_policy(no_delete=True) as fwp:
# stop the AgentRPC patch for this one to test pending states
self.agentapi_delf_p.stop()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
@ -298,33 +296,30 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
fw_db = self.plugin._get_firewall(ctx, fw_id)
for k, v in attrs.iteritems():
self.assertEqual(fw_db[k], v)
# cleanup the pending firewall
self.plugin.callbacks.firewall_deleted(ctx, fw_id)
def test_delete_firewall_after_agent_delete(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
no_delete=True) as fw:
fw_id = fw['firewall']['id']
with ctx.session.begin(subtransactions=True):
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.plugin.callbacks.firewall_deleted(ctx, fw_id)
self.assertRaises(firewall.FirewallNotFound,
self.plugin.get_firewall,
ctx, fw_id)
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.assertRaises(firewall.FirewallNotFound,
self.plugin.get_firewall,
ctx, fw_id)
def test_make_firewall_dict_with_in_place_rules(self):
ctx = context.get_admin_context()
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
with contextlib.nested(self.firewall_rule(name='fwr1',
no_delete=True),
self.firewall_rule(name='fwr2',
no_delete=True),
self.firewall_rule(name='fwr3',
no_delete=True)) as fr:
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
@ -335,8 +330,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
fw_rules = (
self.plugin._make_firewall_dict_with_rules(ctx,
@ -348,13 +342,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
def test_make_firewall_dict_with_in_place_rules_no_policy(self):
ctx = context.get_admin_context()
with self.firewall(no_delete=True) as fw:
with self.firewall() as fw:
fw_id = fw['firewall']['id']
fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id)
self.assertEqual(fw_rules['firewall_rule_list'], [])
def test_list_firewalls(self):
with self.firewall_policy(no_delete=True) as fwp:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name='fw1', firewall_policy_id=fwp_id,
description='fw') as fwalls: