diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index 5afdc0989f..b7b59bbf1d 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -19,6 +19,7 @@ from oslo.config import cfg +from neutron.common import exceptions as n_exception from neutron.common import rpc as q_rpc from neutron.common import topics from neutron import context as neutron_context @@ -127,6 +128,17 @@ class FirewallAgentApi(proxy.RpcProxy): ) +class FirewallCountExceeded(n_exception.NeutronException): + + """Reference implementation specific exception for firewall count. + + Only one firewall is supported per tenant. When a second + firewall is tried to be created, this exception will be raised. + """ + message = _("Exceeded allowed count of firewalls for tenant " + "%(tenant_id)s. Only one firewall is supported per tenant.") + + class FirewallPlugin(firewall_db.Firewall_db_mixin): """Implementation of the Neutron Firewall Service Plugin. @@ -209,6 +221,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin): def create_firewall(self, context, firewall): LOG.debug(_("create_firewall() called")) + tenant_id = self._get_tenant_id_for_create(context, + firewall['firewall']) + fw_count = self.get_firewalls_count(context) + if fw_count: + raise FirewallCountExceeded(tenant_id=tenant_id) firewall['firewall']['status'] = const.PENDING_CREATE fw = super(FirewallPlugin, self).create_firewall(context, firewall) fw_with_rules = ( diff --git a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py index 4d0f53383a..4d02639f65 100644 --- a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py +++ b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py @@ -140,16 +140,10 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): fwp_id = fwp['firewall_policy']['id'] attrs = self._get_test_firewall_attrs() attrs['firewall_policy_id'] = fwp_id - with contextlib.nested(self.firewall( - firewall_policy_id=fwp_id, - tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True), self.firewall( - firewall_policy_id=fwp_id, - tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True)) as fws: - fw_list = [fw['firewall'] for fw in fws] + with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, + admin_state_up=test_db_firewall.ADMIN_STATE_UP, + no_delete=True) as fw: + fw_list = [fw['firewall']] f = self.callbacks.get_firewalls_for_tenant_without_rules res = f(ctx, host='dummy') for fw in res: @@ -202,6 +196,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS) self.callbacks = self.plugin.callbacks + def test_create_second_firewall_not_permitted(self): + with self.firewall(no_delete=True): + res = self._create_firewall( + None, 'firewall2', description='test', + firewall_policy_id=None, admin_state_up=True) + self.assertEqual(res.status_int, 500) + def test_update_firewall(self): ctx = context.get_admin_context() name = "new_firewall1" @@ -352,3 +353,11 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): fw_id = fw['firewall']['id'] fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id) self.assertEqual(fw_rules['firewall_rule_list'], []) + + def test_list_firewalls(self): + with self.firewall_policy(no_delete=True) as fwp: + fwp_id = fwp['firewall_policy']['id'] + with self.firewall(name='fw1', firewall_policy_id=fwp_id, + description='fw') as fwalls: + self._test_list_resources('firewall', [fwalls], + query_params='description=fw')