From 061f27b489abe9c24b13eb095f2d9f25c2e8832b Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Tue, 4 Mar 2014 03:27:11 -0800 Subject: [PATCH] Mock agent RPC for FWaaS tests to delete DB objs This changes the firewall service unit tests to mock the RPC calls from the firewall service to the agent. This allows the tests to fake the agent response RPC that removes the firewall from the DB so the all of the other objects that the firewall depends on can be deleted. Closes-Bug: #1288441 Change-Id: I32462ab5557b9c52328bf6a23a12efc2d799644c --- .../unit/db/firewall/test_db_firewall.py | 230 +++++++++--------- .../services/firewall/test_fwaas_plugin.py | 102 ++++---- 2 files changed, 162 insertions(+), 170 deletions(-) diff --git a/neutron/tests/unit/db/firewall/test_db_firewall.py b/neutron/tests/unit/db/firewall/test_db_firewall.py index aa7f3efa6f..010aa9d1ef 100644 --- a/neutron/tests/unit/db/firewall/test_db_firewall.py +++ b/neutron/tests/unit/db/firewall/test_db_firewall.py @@ -20,6 +20,7 @@ import contextlib import logging +import mock import webob.exc from neutron.api import extensions as api_ext @@ -28,9 +29,11 @@ from neutron import context from neutron.db.firewall import firewall_db as fdb import neutron.extensions from neutron.extensions import firewall +from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import uuidutils from neutron.plugins.common import constants +from neutron.services.firewall import fwaas_plugin from neutron.tests.unit import test_db_plugin @@ -38,6 +41,8 @@ LOG = logging.getLogger(__name__) DB_FW_PLUGIN_KLASS = ( "neutron.db.firewall.firewall_db.Firewall_db_mixin" ) +FWAAS_PLUGIN = 'neutron.services.firewall.fwaas_plugin' +DELETEFW_PATH = FWAAS_PLUGIN + '.FirewallAgentApi.delete_firewall' extensions_path = ':'.join(neutron.extensions.__path__) DESCRIPTION = 'default description' SHARED = True @@ -53,6 +58,23 @@ ENABLED = True ADMIN_STATE_UP = True +class FakeAgentApi(fwaas_plugin.FirewallCallbacks): + """ + This class used to mock the AgentAPI delete method inherits from + FirewallCallbacks because it needs access to the firewall_deleted method. + The delete_firewall method belongs to the FirewallAgentApi, which has + no access to the firewall_deleted method normally because it's not + responsible for deleting the firewall from the DB. However, it needs + to in the unit tests since there is no agent to call back. + """ + def __init__(self): + pass + + def delete_firewall(self, context, firewall, **kwargs): + self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL'] + self.firewall_deleted(context, firewall['id'], **kwargs) + + class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): resource_prefix_map = dict( (k, constants.COMMON_PREFIXES[constants.FIREWALL]) @@ -60,6 +82,9 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): ) def setUp(self, core_plugin=None, fw_plugin=None, ext_mgr=None): + self.agentapi_delf_p = mock.patch(DELETEFW_PATH, create=True, + new=FakeAgentApi().delete_firewall) + self.agentapi_delf_p.start() if not fw_plugin: fw_plugin = DB_FW_PLUGIN_KLASS service_plugins = {'fw_plugin_name': fw_plugin} @@ -304,18 +329,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall_policy1" attrs = self._get_test_firewall_policy_attrs(name) - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] attrs['firewall_rules'] = fw_rule_ids with self.firewall_policy(name=name, shared=SHARED, firewall_rules=fw_rule_ids, - audited=AUDITED, - no_delete=True) as fwp: + audited=AUDITED) as fwp: for k, v in attrs.iteritems(): self.assertEqual(fwp['firewall_policy'][k], v) @@ -372,13 +393,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_with_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] attrs['firewall_rules'] = fw_rule_ids data = {'firewall_policy': @@ -394,26 +412,25 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_replace_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4')) as frs: + fr1 = frs[0:2] + fr2 = frs[2:4] + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} req = self.new_update_request('firewall_policies', data, fwp['firewall_policy']['id']) req.get_response(self.ext_api) - with contextlib.nested(self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True)) as fr2: + fw_rule_ids = [r['firewall_rule']['id'] for r in fr2] attrs['firewall_rules'] = fw_rule_ids - data = {'firewall_policy': - {'firewall_rules': fw_rule_ids}} - req = self.new_update_request('firewall_policies', data, + new_data = {'firewall_policy': + {'firewall_rules': fw_rule_ids}} + req = self.new_update_request('firewall_policies', new_data, fwp['firewall_policy']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) @@ -424,15 +441,11 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_reorder_rules(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [fr[2]['firewall_rule']['id'], fr[3]['firewall_rule']['id']] data = {'firewall_policy': @@ -472,11 +485,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_policy_with_non_existing_rule(self): attrs = self._get_test_firewall_policy_attrs() - with self.firewall_policy() as fwp: - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2')) as fr: + with self.firewall_policy() as fwp: fw_rule_ids = [r['firewall_rule']['id'] for r in fr] # appending non-existent rule fw_rule_ids.append(uuidutils.generate_uuid()) @@ -512,7 +523,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() with self.firewall_policy(no_delete=True) as fwp: fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr: + with self.firewall_rule(name='fwr1') as fr: fr_id = fr['firewall_rule']['id'] fw_rule_ids = [fr_id] attrs['firewall_rules'] = fw_rule_ids @@ -534,7 +545,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall_policy_with_firewall_association(self): attrs = self._get_test_firewall_attrs() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -585,10 +596,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_show_firewall_rule_with_fw_policy_associated(self): attrs = self._get_test_firewall_rule_attrs() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fw_rule: + with self.firewall_rule() as fw_rule: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id data = {'firewall_policy': {'firewall_rules': [fw_rule['firewall_rule']['id']]}} @@ -604,12 +615,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): self.assertEqual(res['firewall_rule'][k], v) def test_list_firewall_rules(self): - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: query_params = 'protocol=tcp' self._test_list_resources('firewall_rule', fr, query_params=query_params) @@ -620,7 +628,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10:20' attrs['destination_port'] = '30:40' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': '10:20', 'destination_port': '30:40'}} @@ -633,7 +641,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10000' attrs['destination_port'] = '80' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': 10000, 'destination_port': 80}} @@ -646,7 +654,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = '10000' attrs['destination_port'] = '80' - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': '10000', 'destination_port': '80'}} @@ -659,7 +667,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs['source_port'] = None attrs['destination_port'] = None - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: data = {'firewall_rule': {'name': name, 'source_port': None, 'destination_port': None}} @@ -673,10 +681,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_update_firewall_rule_with_policy_associated(self): name = "new_firewall_rule1" attrs = self._get_test_firewall_rule_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id fwr_id = fwr['firewall_rule']['id'] data = {'firewall_policy': {'firewall_rules': [fwr_id]}} req = self.new_update_request('firewall_policies', data, @@ -712,10 +720,10 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall_rule_with_policy_associated(self): attrs = self._get_test_firewall_rule_attrs() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['firewall_policy_id'] = fwp_id - with self.firewall_rule(no_delete=True) as fwr: + with self.firewall_rule() as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['firewall_policy_id'] = fwp_id fwr_id = fwr['firewall_rule']['id'] data = {'firewall_policy': {'firewall_rules': [fwr_id]}} req = self.new_update_request('firewall_policies', data, @@ -729,7 +737,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(name=name, @@ -743,7 +751,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): name = "firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(name=name, @@ -759,7 +767,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): self.assertEqual(res['firewall'][k], v) def test_list_firewalls(self): - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with contextlib.nested(self.firewall(name='fw1', firewall_policy_id=fwp_id, @@ -793,7 +801,7 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): def test_delete_firewall(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, no_delete=True) as fw: @@ -809,13 +817,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as frs: + fr1 = frs[0:2] + fwr3 = frs[2] + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] attrs['firewall_rules'] = fw_rule_ids[:] data = {'firewall_policy': @@ -828,19 +837,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): insert_after=None, expected_code=webob.exc.HTTPConflict.code, expected_body=None) - with self.firewall_rule(name='fwr3', no_delete=True) as fwr3: - fwr3_id = fwr3['firewall_rule']['id'] - attrs['firewall_rules'].insert(0, fwr3_id) - self._rule_action('insert', fwp_id, fwr3_id, - insert_before=fw_rule_ids[0], - insert_after=None, - expected_code=webob.exc.HTTPOk.code, - expected_body=attrs) + fwr3_id = fwr3['firewall_rule']['id'] + attrs['firewall_rules'].insert(0, fwr3_id) + self._rule_action('insert', fwp_id, fwr3_id, + insert_before=fw_rule_ids[0], + insert_after=None, + expected_code=webob.exc.HTTPOk.code, + expected_body=attrs) def test_insert_rule_in_policy_failures(self): - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr1: + with self.firewall_rule(name='fwr1') as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fr1_id = fr1['firewall_rule']['id'] fw_rule_ids = [fr1_id] data = {'firewall_policy': @@ -895,23 +903,16 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr0', - no_delete=True), - self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True), - self.firewall_rule(name='fwr4', - no_delete=True), - self.firewall_rule(name='fwr5', - no_delete=True), - self.firewall_rule(name='fwr6', - no_delete=True)) as fwr: + with contextlib.nested(self.firewall_rule(name='fwr0'), + self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3'), + self.firewall_rule(name='fwr4'), + self.firewall_rule(name='fwr5'), + self.firewall_rule(name='fwr6')) as fwr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id # test insert when rule list is empty fwr0_id = fwr[0]['firewall_rule']['id'] attrs['firewall_rules'].insert(0, fwr0_id) @@ -976,15 +977,12 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): attrs = self._get_test_firewall_policy_attrs() attrs['audited'] = False attrs['firewall_list'] = [] - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - attrs['id'] = fwp_id - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr1: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] attrs['firewall_rules'] = fw_rule_ids[:] data = {'firewall_policy': @@ -1014,9 +1012,9 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): expected_body=None) def test_remove_rule_from_policy_failures(self): - with self.firewall_policy() as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr1: + with self.firewall_rule(name='fwr1') as fr1: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [fr1['firewall_rule']['id']] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} diff --git a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py index 7591f2805c..d4590e8f0f 100644 --- a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py +++ b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py @@ -44,7 +44,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_set_firewall_status(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, admin_state_up= @@ -64,7 +64,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_firewall_deleted(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, admin_state_up=test_db_firewall.ADMIN_STATE_UP, @@ -83,11 +83,12 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_firewall_deleted_error(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] - with self.firewall(firewall_policy_id=fwp_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + with self.firewall( + firewall_policy_id=fwp_id, + admin_state_up=test_db_firewall.ADMIN_STATE_UP, + ) as fw: fw_id = fw['firewall']['id'] res = self.callbacks.firewall_deleted(ctx, fw_id, host='dummy') @@ -98,17 +99,15 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_get_firewall_for_tenant(self): tenant_id = 'test-tenant' ctx = context.Context('', tenant_id) - with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with contextlib.nested(self.firewall_rule(name='fwr1', - tenant_id=tenant_id, - no_delete=True), - self.firewall_rule(name='fwr2', - tenant_id=tenant_id, - no_delete=True), - self.firewall_rule(name='fwr3', - tenant_id=tenant_id, - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1', + tenant_id=tenant_id), + self.firewall_rule(name='fwr2', + tenant_id=tenant_id), + self.firewall_rule(name='fwr3', + tenant_id=tenant_id) + ) as fr: + with self.firewall_policy(tenant_id=tenant_id) as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [r['firewall_rule']['id'] for r in fr] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} @@ -120,8 +119,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + test_db_firewall.ADMIN_STATE_UP) as fw: fw_id = fw['firewall']['id'] res = self.callbacks.get_firewalls_for_tenant(ctx, host='dummy') @@ -136,13 +134,13 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def test_get_firewall_for_tenant_without_rules(self): tenant_id = 'test-tenant' ctx = context.Context('', tenant_id) - with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp: + with self.firewall_policy(tenant_id=tenant_id) as fwp: fwp_id = fwp['firewall_policy']['id'] attrs = self._get_test_firewall_attrs() attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + admin_state_up=test_db_firewall.ADMIN_STATE_UP + ) as fw: fw_list = [fw['firewall']] f = self.callbacks.get_firewalls_for_tenant_without_rules res = f(ctx, host='dummy') @@ -196,7 +194,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): self.callbacks = self.plugin.callbacks def test_create_second_firewall_not_permitted(self): - with self.firewall(no_delete=True): + with self.firewall(): res = self._create_firewall( None, 'firewall2', description='test', firewall_policy_id=None, admin_state_up=True) @@ -207,7 +205,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -230,7 +228,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -246,7 +244,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): name = "new_firewall1" attrs = self._get_test_firewall_attrs(name) - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -259,9 +257,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): self.assertEqual(res.status_int, 409) def test_update_firewall_rule_fails_when_firewall_pending(self): - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with self.firewall_rule(name='fwr1', no_delete=True) as fr: + with self.firewall_rule(name='fwr1') as fr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fr_id = fr['firewall_rule']['id'] fw_rule_ids = [fr_id] data = {'firewall_policy': @@ -271,8 +269,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): req.get_response(self.ext_api) with self.firewall(firewall_policy_id=fwp_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True): + test_db_firewall.ADMIN_STATE_UP): data = {'firewall_rule': {'protocol': 'udp'}} req = self.new_update_request('firewall_rules', data, fr_id) @@ -282,8 +279,9 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): def test_delete_firewall(self): ctx = context.get_admin_context() attrs = self._get_test_firewall_attrs() - - with self.firewall_policy(no_delete=True) as fwp: + # stop the AgentRPC patch for this one to test pending states + self.agentapi_delf_p.stop() + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, @@ -298,33 +296,30 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): fw_db = self.plugin._get_firewall(ctx, fw_id) for k, v in attrs.iteritems(): self.assertEqual(fw_db[k], v) + # cleanup the pending firewall + self.plugin.callbacks.firewall_deleted(ctx, fw_id) def test_delete_firewall_after_agent_delete(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(firewall_policy_id=fwp_id, no_delete=True) as fw: fw_id = fw['firewall']['id'] - with ctx.session.begin(subtransactions=True): - req = self.new_delete_request('firewalls', fw_id) - res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, 204) - self.plugin.callbacks.firewall_deleted(ctx, fw_id) - self.assertRaises(firewall.FirewallNotFound, - self.plugin.get_firewall, - ctx, fw_id) + req = self.new_delete_request('firewalls', fw_id) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + self.assertRaises(firewall.FirewallNotFound, + self.plugin.get_firewall, + ctx, fw_id) def test_make_firewall_dict_with_in_place_rules(self): ctx = context.get_admin_context() - with self.firewall_policy(no_delete=True) as fwp: - fwp_id = fwp['firewall_policy']['id'] - with contextlib.nested(self.firewall_rule(name='fwr1', - no_delete=True), - self.firewall_rule(name='fwr2', - no_delete=True), - self.firewall_rule(name='fwr3', - no_delete=True)) as fr: + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr: + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] fw_rule_ids = [r['firewall_rule']['id'] for r in fr] data = {'firewall_policy': {'firewall_rules': fw_rule_ids}} @@ -335,8 +330,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): attrs['firewall_policy_id'] = fwp_id with self.firewall(firewall_policy_id=fwp_id, admin_state_up= - test_db_firewall.ADMIN_STATE_UP, - no_delete=True) as fw: + test_db_firewall.ADMIN_STATE_UP) as fw: fw_id = fw['firewall']['id'] fw_rules = ( self.plugin._make_firewall_dict_with_rules(ctx, @@ -348,13 +342,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): def test_make_firewall_dict_with_in_place_rules_no_policy(self): ctx = context.get_admin_context() - with self.firewall(no_delete=True) as fw: + with self.firewall() as fw: fw_id = fw['firewall']['id'] fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id) self.assertEqual(fw_rules['firewall_rule_list'], []) def test_list_firewalls(self): - with self.firewall_policy(no_delete=True) as fwp: + with self.firewall_policy() as fwp: fwp_id = fwp['firewall_policy']['id'] with self.firewall(name='fw1', firewall_policy_id=fwp_id, description='fw') as fwalls: