diff --git a/neutron/tests/unit/db/firewall/test_db_firewall.py b/neutron/tests/unit/db/firewall/test_db_firewall.py index aa7f3efa6f..010aa9d1ef 100644 --- a/neutron/tests/unit/db/firewall/test_db_firewall.py +++ b/neutron/tests/unit/db/firewall/test_db_firewall.py @@ -20,6 +20,7 @@ import contextlib import logging +import mock import webob.exc from neutron.api import extensions as api_ext @@ -28,9 +29,11 @@ from neutron import context from neutron.db.firewall import firewall_db as fdb import neutron.extensions from neutron.extensions import firewall +from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import uuidutils from neutron.plugins.common import constants +from neutron.services.firewall import fwaas_plugin from neutron.tests.unit import test_db_plugin @@ -38,6 +41,8 @@ LOG = logging.getLogger(__name__) DB_FW_PLUGIN_KLASS = ( "neutron.db.firewall.firewall_db.Firewall_db_mixin" ) +FWAAS_PLUGIN = 'neutron.services.firewall.fwaas_plugin' +DELETEFW_PATH = FWAAS_PLUGIN + '.FirewallAgentApi.delete_firewall' extensions_path = ':'.join(neutron.extensions.__path__) DESCRIPTION = 'default description' SHARED = True @@ -53,6 +58,23 @@ ENABLED = True ADMIN_STATE_UP = True +class FakeAgentApi(fwaas_plugin.FirewallCallbacks): + """ + This class used to mock the AgentAPI delete method inherits from + FirewallCallbacks because it needs access to the firewall_deleted method. + The delete_firewall method belongs to the FirewallAgentApi, which has + no access to the firewall_deleted method normally because it's not + responsible for deleting the firewall from the DB. However, it needs + to in the unit tests since there is no agent to call back. + """ + def __init__(self): + pass + + def delete_firewall(self, context, firewall, **kwargs): + self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL'] + self.firewall_deleted(context, firewall['id'], **kwargs) + + class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): resource_prefix_map = dict( (k, constants.COMMON_PREFIXES[constants.FIREWALL]) @@ -60,6 +82,9 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): ) def setUp(self, core_plugin=None, fw_plugin=None, ext_mgr=None): + self.agentapi_delf_p = mock.patch(DELETEFW_PATH, create=True, + new=FakeAgentApi().delete_firewall) + self.agentapi_delf_p.start() if not fw_plugin: fw_plugin = DB_FW_PLUGIN_KLASS service_plugins = {'fw_plugin_name': fw_plugin} @@ -304,18 +329,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall_policy1" attrs = self._get_test_firewall_policy_attrs(name) - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] attrs['firewall_rules'] = fw_rule_ids with self.firewall_policy(name=name, shared=SHARED, firewall_rules=fw_rule_ids, - audited=AUDITED, - no_delete=True) as fwp: + audited=AUDITED) as fwp: for k, v in attrs.iteritems(): self.assertEqual(fwp['firewall_policy'][k], v) @@ -372,13 +393,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_with_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] attrs['firewall_rules'] = fw_rule_ids data = {'firewall_policy': @@ -394,26 +412,25 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_replace_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4')) as frs: + fr1 = frs[0:2] + fr2 = frs[2:4] + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} req = self.new_update_request('firewall_policies', data, fwp['firewall_policy']['id']) req.get_response(self.ext_api) - with contextlib.nested(self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True)) as fr2: + fw_rule_ids = [r['firewall_rule']['id'] for r in fr2] attrs['firewall_rules'] = fw_rule_ids - data = {'firewall_policy': - {'firewall_rules': fw_rule_ids}} - req = self.new_update_request('firewall_policies', data, + new_data = {'firewall_policy': + {'firewall_rules': fw_rule_ids}} + req = self.new_update_request('firewall_policies', new_data, fwp['firewall_policy']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) @@ -424,15 +441,11 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_reorder_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [fr[2]['firewall_rule']['id'], fr[3]['firewall_rule']['id']] data = {'firewall_policy': @@ -472,11 +485,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_with_non_existing_rule(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] # appending non-existent rule fw_rule_ids.append(uuidutils.generate_uuid()) @@ -512,7 +523,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() with self.firewall_policy(no_delete=True) as fwp: fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr: + with self.firewall_rule(name='fwr1') as fr: fr_id = fr['firewall_rule']['id'] fw_rule_ids = [fr_id] attrs['firewall_rules'] = fw_rule_ids @@ -534,7 +545,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall_policy_with_firewall_association(self): attrs = self._get_test_firewall_attrs() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -585,10 +596,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_show_firewall_rule_with_fw_policy_associated(self): attrs = self._get_test_firewall_rule_attrs() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fw_rule: + with self.firewall_rule() as fw_rule: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id data = {'firewall_policy': {'firewall_rules': [fw_rule['firewall_rule']['id']]}} @@ -604,12 +615,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): self.assertEqual(res['firewall_rule'][k], v) def test_list_firewall_rules(self): - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: query_params = 'protocol=tcp' self._test_list_resources('firewall_rule', fr, query_params=query_params) @@ -620,7 +628,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10:20' attrs['destination_port'] = '30:40' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': '10:20', 'destination_port': '30:40'}} @@ -633,7 +641,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10000' attrs['destination_port'] = '80' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': 10000, 'destination_port': 80}} @@ -646,7 +654,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10000' attrs['destination_port'] = '80' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': '10000', 'destination_port': '80'}} @@ -659,7 +667,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = None attrs['destination_port'] = None - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': None, 'destination_port': None}} @@ -673,10 +681,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_rule_with_policy_associated(self): name = "new_firewall_rule1" attrs = self._get_test_firewall_rule_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id fwr_id = fwr['firewall_rule']['id'] data = {'firewall_policy': {'firewall_rules': [fwr_id]}} req = self.new_update_request('firewall_policies', data, @@ -712,10 +720,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall_rule_with_policy_associated(self): attrs = self._get_test_firewall_rule_attrs() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id fwr_id = fwr['firewall_rule']['id'] data = {'firewall_policy': {'firewall_rules': [fwr_id]}} req = self.new_update_request('firewall_policies', data, @@ -729,7 +737,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(name=name, @@ -743,7 +751,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(name=name, @@ -759,7 +767,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): self.assertEqual(res['firewall'][k], v) def test_list_firewalls(self): - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with contextlib.nested(self.firewall(name='fw1', firewall_policy_id=fwp_id, @@ -793,7 +801,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, no_delete=True) as fw: @@ -809,13 +817,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as frs: + fr1 = frs[0:2] + fwr3 = frs[2] + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] attrs['firewall_rules'] = fw_rule_ids[:] data = {'firewall_policy': @@ -828,19 +837,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): insert_after=None, expected_code=webob.exc.HTTPConflict.code, expected_body=None) - with self.firewall_rule(name='fwr3', no_delete=True) as fwr3: - fwr3_id = fwr3['firewall_rule']['id'] - attrs['firewall_rules'].insert(0, fwr3_id) - self._rule_action('insert', fwp_id, fwr3_id, - insert_before=fw_rule_ids[0], - insert_after=None, - expected_code=webob.exc.HTTPOk.code, - expected_body=attrs) + fwr3_id = fwr3['firewall_rule']['id'] + attrs['firewall_rules'].insert(0, fwr3_id) + self._rule_action('insert', fwp_id, fwr3_id, + insert_before=fw_rule_ids[0], + insert_after=None, + expected_code=webob.exc.HTTPOk.code, + expected_body=attrs) def test_insert_rule_in_policy_failures(self): - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr1: + with self.firewall_rule(name='fwr1') as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fr1_id = fr1['firewall_rule']['id'] fw_rule_ids = [fr1_id] data = {'firewall_policy': @@ -895,23 +903,16 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr0', - no_delete=True), - self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True), - self.firewall_rule(name='fwr5', - no_delete=True), - self.firewall_rule(name='fwr6', - no_delete=True)) as fwr: + with contextlib.nested(self.firewall_rule(name='fwr0'), + self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4'), + self.firewall_rule(name='fwr5'), + self.firewall_rule(name='fwr6')) as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id # test insert when rule list is empty fwr0_id = fwr[0]['firewall_rule']['id'] attrs['firewall_rules'].insert(0, fwr0_id) @@ -976,15 +977,12 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] attrs['firewall_rules'] = fw_rule_ids[:] data = {'firewall_policy': @@ -1014,9 +1012,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): expected_body=None) def test_remove_rule_from_policy_failures(self): - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr1: + with self.firewall_rule(name='fwr1') as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [fr1['firewall_rule']['id']] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} diff --git a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py index 7591f2805c..d4590e8f0f 100644 --- a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py +++ b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py @@ -44,7 +44,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_set_firewall_status(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, admin_state_up= @@ -64,7 +64,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_firewall_deleted(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, admin_state_up=test_db_firewall.ADMIN_STATE_UP, @@ -83,11 +83,12 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_firewall_deleted_error(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] - with self.firewall(firewall_policy_id=fwp_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + with self.firewall( + firewall_policy_id=fwp_id, + admin_state_up=test_db_firewall.ADMIN_STATE_UP, + ) as fw: fw_id = fw['firewall']['id'] res = self.callbacks.firewall_deleted(ctx, fw_id, host='dummy') @@ -98,17 +99,15 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_get_firewall_for_tenant(self): tenant_id = 'test-tenant' ctx = context.Context('', tenant_id) - with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with contextlib.nested(self.firewall_rule(name='fwr1', - tenant_id=tenant_id, - no_delete=True), - self.firewall_rule(name='fwr2', - tenant_id=tenant_id, - no_delete=True), - self.firewall_rule(name='fwr3', - tenant_id=tenant_id, - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1', + tenant_id=tenant_id), + self.firewall_rule(name='fwr2', + tenant_id=tenant_id), + self.firewall_rule(name='fwr3', + tenant_id=tenant_id) + ) as fr: + with self.firewall_policy(tenant_id=tenant_id) as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [r['firewall_rule']['id'] for r in fr] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} @@ -120,8 +119,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + test_db_firewall.ADMIN_STATE_UP) as fw: fw_id = fw['firewall']['id'] res = self.callbacks.get_firewalls_for_tenant(ctx, host='dummy') @@ -136,13 +134,13 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_get_firewall_for_tenant_without_rules(self): tenant_id = 'test-tenant' ctx = context.Context('', tenant_id) - with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp: + with self.firewall_policy(tenant_id=tenant_id) as fwp: fwp_id = fwp['firewall_policy']['id'] attrs = self._get_test_firewall_attrs() attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + admin_state_up=test_db_firewall.ADMIN_STATE_UP + ) as fw: fw_list = [fw['firewall']] f = self.callbacks.get_firewalls_for_tenant_without_rules res = f(ctx, host='dummy') @@ -196,7 +194,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): self.callbacks = self.plugin.callbacks def test_create_second_firewall_not_permitted(self): - with self.firewall(no_delete=True): + with self.firewall(): res = self._create_firewall( None, 'firewall2', description='test', firewall_policy_id=None, admin_state_up=True) @@ -207,7 +205,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -230,7 +228,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -246,7 +244,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -259,9 +257,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): self.assertEqual(res.status_int, 409) def test_update_firewall_rule_fails_when_firewall_pending(self): - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr: + with self.firewall_rule(name='fwr1') as fr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fr_id = fr['firewall_rule']['id'] fw_rule_ids = [fr_id] data = {'firewall_policy': @@ -271,8 +269,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): req.get_response(self.ext_api) with self.firewall(firewall_policy_id=fwp_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True): + test_db_firewall.ADMIN_STATE_UP): data = {'firewall_rule': {'protocol': 'udp'}} req = self.new_update_request('firewall_rules', data, fr_id) @@ -282,8 +279,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): def test_delete_firewall(self): ctx = context.get_admin_context() attrs = self._get_test_firewall_attrs() - - with self.firewall_policy(no_delete=True) as fwp: + # stop the AgentRPC patch for this one to test pending states + self.agentapi_delf_p.stop() + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -298,33 +296,30 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): fw_db = self.plugin._get_firewall(ctx, fw_id) for k, v in attrs.iteritems(): self.assertEqual(fw_db[k], v) + # cleanup the pending firewall + self.plugin.callbacks.firewall_deleted(ctx, fw_id) def test_delete_firewall_after_agent_delete(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, no_delete=True) as fw: fw_id = fw['firewall']['id'] - with ctx.session.begin(subtransactions=True): - req = self.new_delete_request('firewalls', fw_id) - res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, 204) - self.plugin.callbacks.firewall_deleted(ctx, fw_id) - self.assertRaises(firewall.FirewallNotFound, - self.plugin.get_firewall, - ctx, fw_id) + req = self.new_delete_request('firewalls', fw_id) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + self.assertRaises(firewall.FirewallNotFound, + self.plugin.get_firewall, + ctx, fw_id) def test_make_firewall_dict_with_in_place_rules(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [r['firewall_rule']['id'] for r in fr] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} @@ -335,8 +330,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + test_db_firewall.ADMIN_STATE_UP) as fw: fw_id = fw['firewall']['id'] fw_rules = ( self.plugin._make_firewall_dict_with_rules(ctx, @@ -348,13 +342,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): def test_make_firewall_dict_with_in_place_rules_no_policy(self): ctx = context.get_admin_context() - with self.firewall(no_delete=True) as fw: + with self.firewall() as fw: fw_id = fw['firewall']['id'] fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id) self.assertEqual(fw_rules['firewall_rule_list'], []) def test_list_firewalls(self): - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(name='fw1', firewall_policy_id=fwp_id, description='fw') as fwalls: