Merge "NSX|v: refactor FWaaS driver"
This commit is contained in:
commit
753c72ed55
@ -3593,18 +3593,47 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
if router_id:
|
||||
self._update_edge_router(context, router_id)
|
||||
|
||||
def _update_subnets_and_dnat_firewall(self, context, router,
|
||||
def _update_subnets_and_dnat_firewall(self, context, router_db,
|
||||
router_id=None):
|
||||
# Note(asarfaty): If changing the order or names of rules here,
|
||||
# please take care of fwaas _get_other_backend_rules too.
|
||||
fw_rules = []
|
||||
"""Update the router edge firewall with all the relevant rules.
|
||||
|
||||
router_db is the neutron router structure
|
||||
router_id is the id of the actual router that will be updated on
|
||||
the NSX (in case of distributed router it can be plr or tlr)
|
||||
"""
|
||||
if not router_id:
|
||||
router_id = router['id']
|
||||
router_id = router_db['id']
|
||||
|
||||
# Add fw rules if FWaaS is enabled
|
||||
# in case of a distributed-router:
|
||||
# router['id'] is the id of the neutron router (=tlr)
|
||||
# and router_id is the plr/tlr (the one that is being updated)
|
||||
fwaas_rules = None
|
||||
if (self.fwaas_callbacks.should_apply_firewall_to_router(
|
||||
context, router_db, router_id)):
|
||||
fwaas_rules = self.fwaas_callbacks.get_fwaas_rules_for_router(
|
||||
context, router_db['id'])
|
||||
|
||||
self.update_router_firewall(context, router_id, router_db,
|
||||
fwaas_rules=fwaas_rules)
|
||||
|
||||
def update_router_firewall(self, context, router_id, router_db,
|
||||
fwaas_rules=None):
|
||||
"""Recreate all rules in the router edge firewall
|
||||
|
||||
router_db is the neutron router structure
|
||||
router_id is the id of the actual router that will be updated on
|
||||
the NSX (in case of distributed router it can be plr or tlr)
|
||||
if fwaas_rules is not none - this router is attached to a firewall
|
||||
"""
|
||||
fw_rules = []
|
||||
router_with_firewall = True if fwaas_rules is not None else False
|
||||
neutron_id = router_db['id']
|
||||
|
||||
# Add FW rule to open subnets firewall flows and static routes
|
||||
# relative flows
|
||||
subnet_cidrs = self._find_router_subnets_cidrs(context, router['id'])
|
||||
routes = self._get_extra_routes_by_router_id(context, router_id)
|
||||
subnet_cidrs = self._find_router_subnets_cidrs(context, neutron_id)
|
||||
routes = self._get_extra_routes_by_router_id(context, neutron_id)
|
||||
subnet_cidrs.extend([route['destination'] for route in routes])
|
||||
if subnet_cidrs:
|
||||
subnet_fw_rule = {
|
||||
@ -3619,17 +3648,13 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
if self.metadata_proxy_handler:
|
||||
fw_rules += nsx_v_md_proxy.get_router_fw_rules()
|
||||
|
||||
# Add fw rules if FWaaS is enabled
|
||||
router_with_firewall = False
|
||||
if (self.fwaas_callbacks.should_apply_firewall_to_router(
|
||||
context, router, router_id)):
|
||||
fw_rules.extend(self.fwaas_callbacks.get_fwaas_rules_for_router(
|
||||
context, router['id']))
|
||||
router_with_firewall = True
|
||||
# Add FWaaS rules
|
||||
if router_with_firewall and fwaas_rules:
|
||||
fw_rules += fwaas_rules
|
||||
|
||||
if not router_with_firewall:
|
||||
# Add FW rule to open dnat firewall flows
|
||||
_, dnat_rules = self._get_nat_rules(context, router)
|
||||
_, dnat_rules = self._get_nat_rules(context, router_db)
|
||||
dnat_cidrs = [rule['dst'] for rule in dnat_rules]
|
||||
if dnat_cidrs:
|
||||
dnat_fw_rule = {
|
||||
@ -3641,7 +3666,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
|
||||
# Add no-snat rules
|
||||
nosnat_fw_rules = self._get_nosnat_subnets_fw_rules(
|
||||
context, router)
|
||||
context, router_db)
|
||||
fw_rules.extend(nosnat_fw_rules)
|
||||
|
||||
# Get the load balancer rules in case they are refreshed
|
||||
|
@ -22,11 +22,7 @@ from neutron_fwaas.common import exceptions
|
||||
from neutron_fwaas.services.firewall.drivers import fwaas_base
|
||||
|
||||
from vmware_nsx.common import locking
|
||||
from vmware_nsx.plugins.nsx_v.vshield.common import (
|
||||
exceptions as vcns_exc)
|
||||
from vmware_nsx.plugins.nsx_v.vshield import edge_firewall_driver
|
||||
from vmware_nsx.plugins.nsx_v.vshield import edge_utils
|
||||
from vmware_nsx.plugins.nsx_v.vshield import vcns_driver
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
FWAAS_DRIVER_NAME = 'Fwaas NSX-V driver'
|
||||
@ -36,14 +32,17 @@ RULE_NAME_PREFIX = 'Fwaas-'
|
||||
class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
"""NSX-V driver for Firewall As A Service - V1."""
|
||||
|
||||
@property
|
||||
def core_plugin(self):
|
||||
return directory.get_plugin()
|
||||
|
||||
@property
|
||||
def edge_manager(self):
|
||||
return directory.get_plugin().edge_manager
|
||||
return self.core_plugin.edge_manager
|
||||
|
||||
def __init__(self):
|
||||
LOG.debug("Loading FWaaS NsxVDriver.")
|
||||
super(EdgeFwaasDriver, self).__init__()
|
||||
self._nsxv = vcns_driver.VcnsDriver(None)
|
||||
|
||||
def should_apply_firewall_to_router(self, router_data):
|
||||
"""Return True if the firewall rules should be added the router
|
||||
@ -81,7 +80,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
# Get edges for all the routers in the apply list.
|
||||
# note that shared routers are currently not supported
|
||||
edge_manager = self.edge_manager
|
||||
edges = []
|
||||
edges_map = {}
|
||||
for router_info in apply_list:
|
||||
|
||||
# No FWaaS rules needed if there is no external gateway
|
||||
@ -102,8 +101,9 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
# look for the edge id in the DB
|
||||
edge_id = edge_utils.get_router_edge_id(context, lookup_id)
|
||||
if edge_id:
|
||||
edges.append(edge_id)
|
||||
return edges
|
||||
edges_map[router_id] = {'edge_id': edge_id,
|
||||
'lookup_id': lookup_id}
|
||||
return edges_map
|
||||
|
||||
def _translate_rules(self, fwaas_rules):
|
||||
translated_rules = []
|
||||
@ -128,91 +128,29 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
|
||||
return translated_rules
|
||||
|
||||
def _is_allow_external_rule(self, rule):
|
||||
rule_name = rule.get('name', '')
|
||||
if rule_name == edge_firewall_driver.FWAAS_ALLOW_EXT_RULE_NAME:
|
||||
return True
|
||||
# For older routers, the allow-external rule didn't have a name
|
||||
# TODO(asarfaty): delete this in the future
|
||||
if (not rule_name and
|
||||
rule.get('action') == edge_firewall_driver.FWAAS_ALLOW and
|
||||
rule.get('destination_vnic_groups', []) == ['external']):
|
||||
return True
|
||||
return False
|
||||
def _set_rules_on_router_edge(self, context, router_id, neutron_id,
|
||||
edge_id, fw_id, translated_rules,
|
||||
delete_fw=False):
|
||||
"""Recreate router edge firewall rules
|
||||
|
||||
def _get_other_backend_rules(self, context, edge_id):
|
||||
"""Get a list of current backend rules from other applications
|
||||
|
||||
Those rules should stay on the backend firewall, when updating the
|
||||
Using the plugin code to recreate all the rules with the additional
|
||||
FWaaS rules.
|
||||
|
||||
Return it as 2 separate lists of rules, which should go before/after
|
||||
the fwaas rules.
|
||||
router_id is the is of the router about to be updated
|
||||
(in case of distributed router - the plr)
|
||||
neutron_id is the neutron router id
|
||||
"""
|
||||
try:
|
||||
backend_fw = self._nsxv.get_firewall(context, edge_id)
|
||||
backend_rules = backend_fw['firewall_rule_list']
|
||||
except vcns_exc.VcnsApiException:
|
||||
# Need to create a new one
|
||||
backend_rules = []
|
||||
# remove old FWaaS rules from the rules list.
|
||||
# also delete the allow-external rule, if it is there.
|
||||
# If necessary - we will add it again later
|
||||
before_rules = []
|
||||
after_rules = []
|
||||
go_after = False
|
||||
for rule_item in backend_rules:
|
||||
rule = rule_item['firewall_rule']
|
||||
rule_name = rule.get('name', '')
|
||||
fwaas_rule = rule_name.startswith(RULE_NAME_PREFIX)
|
||||
if fwaas_rule:
|
||||
# reached the fwaas part, the rest of the rules should be
|
||||
# in the 'after' list
|
||||
go_after = True
|
||||
if (rule_name == edge_firewall_driver.DNAT_RULE_NAME or
|
||||
rule_name == edge_firewall_driver.NO_SNAT_RULE_NAME):
|
||||
# passed the fwaas part, the rest of the rules should be
|
||||
# in the 'after' list
|
||||
go_after = True
|
||||
if (not fwaas_rule and
|
||||
not self._is_allow_external_rule(rule)):
|
||||
if go_after:
|
||||
after_rules.append(rule)
|
||||
else:
|
||||
before_rules.append(rule)
|
||||
|
||||
return before_rules, after_rules
|
||||
|
||||
def _set_rules_on_edge(self, context, edge_id, fw_id, translated_rules,
|
||||
allow_external=False):
|
||||
"""delete old FWaaS rules from the Edge, and add new ones
|
||||
|
||||
Note that the edge might have other FW rules like NAT or LBaas
|
||||
that should remain there.
|
||||
|
||||
allow_external is usually False because it shouldn't exist with a
|
||||
firewall. It should only be True when the firewall is being deleted.
|
||||
"""
|
||||
# Get the existing backend rules which do not belong to FWaaS
|
||||
backend_rules, after_rules = self._get_other_backend_rules(
|
||||
context, edge_id)
|
||||
|
||||
# add new FWaaS rules at the correct location by their original order
|
||||
backend_rules.extend(translated_rules)
|
||||
backend_rules.extend(after_rules)
|
||||
|
||||
# update the backend
|
||||
router_db = self.core_plugin._get_router(context, neutron_id)
|
||||
try:
|
||||
with locking.LockManager.get_lock(str(edge_id)):
|
||||
self._nsxv.update_firewall(
|
||||
edge_id,
|
||||
{'firewall_rule_list': backend_rules},
|
||||
context,
|
||||
allow_external=allow_external)
|
||||
self.core_plugin.update_router_firewall(
|
||||
context, router_id, router_db,
|
||||
fwaas_rules=translated_rules)
|
||||
except Exception as e:
|
||||
# catch known library exceptions and raise Fwaas generic exception
|
||||
LOG.error("Failed to update backend firewall %(fw)s: "
|
||||
"%(e)s", {'e': e, 'fw': fw_id})
|
||||
LOG.error("Failed to update firewall %(fw)s on edge %(edge_id)s: "
|
||||
"%(e)s", {'e': e, 'fw': fw_id, 'edge_id': edge_id})
|
||||
raise exceptions.FirewallInternalDriverError(
|
||||
driver=FWAAS_DRIVER_NAME)
|
||||
|
||||
@ -222,20 +160,27 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
self.apply_default_policy(agent_mode, apply_list, firewall)
|
||||
return
|
||||
|
||||
# get router-edge mapping
|
||||
context = n_context.get_admin_context()
|
||||
|
||||
# Find out the relevant edges
|
||||
router_edges = self._get_routers_edges(context, apply_list)
|
||||
if not router_edges:
|
||||
LOG.warning("Cannot apply the firewall to any of the routers %s",
|
||||
apply_list)
|
||||
edges_map = self._get_routers_edges(context, apply_list)
|
||||
if not edges_map:
|
||||
routers = [r.router_id for r in apply_list]
|
||||
LOG.warning("Cannot apply the firewall %(fw)s to any of the "
|
||||
"routers %(rtrs)s",
|
||||
{'fw': firewall['id'], 'rtrs': routers})
|
||||
return
|
||||
|
||||
# Translate the FWaaS rules
|
||||
rules = self._translate_rules(firewall['firewall_rule_list'])
|
||||
# update each edge
|
||||
for edge_id in router_edges:
|
||||
self._set_rules_on_edge(
|
||||
context, edge_id, firewall['id'], rules)
|
||||
|
||||
# update each relevant edge with the new rules
|
||||
for router_info in apply_list:
|
||||
neutron_id = router_info.router_id
|
||||
info = edges_map.get(neutron_id)
|
||||
if info:
|
||||
self._set_rules_on_router_edge(
|
||||
context, info['lookup_id'], neutron_id, info['edge_id'],
|
||||
firewall['id'], rules)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def create_firewall(self, agent_mode, apply_list, firewall):
|
||||
@ -248,13 +193,22 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
self._create_or_update_firewall(agent_mode, apply_list, firewall)
|
||||
|
||||
def _delete_firewall_or_set_default_policy(self, apply_list, firewall,
|
||||
allow_external):
|
||||
delete_fw=False):
|
||||
# get router-edge mapping
|
||||
context = n_context.get_admin_context()
|
||||
router_edges = self._get_routers_edges(context, apply_list)
|
||||
if router_edges:
|
||||
for edge_id in router_edges:
|
||||
self._set_rules_on_edge(context, edge_id, firewall['id'], [],
|
||||
allow_external=allow_external)
|
||||
edges_map = self._get_routers_edges(context, apply_list)
|
||||
|
||||
# if the firewall is deleted, rules should be None
|
||||
rules = None if delete_fw else []
|
||||
|
||||
# Go over all routers and update them on backend
|
||||
for router_info in apply_list:
|
||||
neutron_id = router_info.router_id
|
||||
info = edges_map.get(neutron_id)
|
||||
if info:
|
||||
self._set_rules_on_router_edge(
|
||||
context, info['lookup_id'], neutron_id, info['edge_id'],
|
||||
firewall['id'], rules, delete_fw=delete_fw)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def delete_firewall(self, agent_mode, apply_list, firewall):
|
||||
@ -264,7 +218,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
And add the default allow-external rule.
|
||||
"""
|
||||
self._delete_firewall_or_set_default_policy(apply_list, firewall,
|
||||
allow_external=True)
|
||||
delete_fw=True)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def apply_default_policy(self, agent_mode, apply_list, firewall):
|
||||
@ -274,7 +228,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
so we only need to delete the current rules.
|
||||
"""
|
||||
self._delete_firewall_or_set_default_policy(apply_list, firewall,
|
||||
allow_external=False)
|
||||
delete_fw=False)
|
||||
|
||||
def get_firewall_translated_rules(self, firewall):
|
||||
if firewall['admin_state_up']:
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import copy
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron_fwaas.common import exceptions
|
||||
|
||||
@ -28,8 +29,6 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
||||
def setUp(self):
|
||||
super(NsxvFwaasTestCase, self).setUp()
|
||||
self.firewall = edge_fwaas_driver.EdgeFwaasDriver()
|
||||
self.firewall._get_routers_edges = mock.Mock()
|
||||
self.firewall._get_routers_edges.return_value = ['edge-1']
|
||||
|
||||
def _fake_rules_v4(self):
|
||||
rule1 = {'enabled': True,
|
||||
@ -81,40 +80,60 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
||||
def _fake_apply_list(self, router_count=1):
|
||||
apply_list = []
|
||||
while router_count > 0:
|
||||
router_inst = {}
|
||||
rtr_id = uuidutils.generate_uuid()
|
||||
router_inst = {'id': rtr_id}
|
||||
router_info_inst = mock.Mock()
|
||||
router_info_inst.router = router_inst
|
||||
router_info_inst.router_id = rtr_id
|
||||
apply_list.append(router_info_inst)
|
||||
router_count -= 1
|
||||
return apply_list
|
||||
|
||||
def _get_fake_mapping(self, apply_list):
|
||||
router_edge_map = {}
|
||||
for router_info in apply_list:
|
||||
router_edge_map[router_info.router_id] = {
|
||||
'edge_id': 'edge-1',
|
||||
'lookup_id': router_info.router_id}
|
||||
return router_edge_map
|
||||
|
||||
def _setup_firewall_with_rules(self, func, router_count=1):
|
||||
apply_list = self._fake_apply_list(router_count=router_count)
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall(rule_list)
|
||||
edges = ['edge-1'] * router_count
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw,\
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
func('nsx', apply_list, firewall)
|
||||
self.assertEqual(router_count, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[-1].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual(len(rule_list), len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
|
||||
def test_create_firewall_no_rules(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
firewall = self._fake_firewall_no_rule()
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual([], backend_rules)
|
||||
|
||||
def test_create_firewall_with_rules(self):
|
||||
self._setup_firewall_with_rules(self.firewall.create_firewall)
|
||||
@ -126,63 +145,42 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
||||
def test_update_firewall_with_rules(self):
|
||||
self._setup_firewall_with_rules(self.firewall.update_firewall)
|
||||
|
||||
def test_create_firewall_with_existing_backend_rules(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall(rule_list)
|
||||
edge_id = 'edge-1'
|
||||
# backend fw already has 3 rules.
|
||||
# The fwaas rule should come in the middle, and replace the fwaas rule
|
||||
existing_rules = [{'name': 'Subnet Rule',
|
||||
'action': 'accept',
|
||||
'enabled': True},
|
||||
{'name': 'Fwaas-1',
|
||||
'action': 'deny',
|
||||
'enabled': True},
|
||||
{'name': 'DNAT Rule',
|
||||
'action': 'accept',
|
||||
'enabled': True}]
|
||||
for rule in existing_rules:
|
||||
self.firewall._nsxv.vcns.add_firewall_rule(edge_id, rule)
|
||||
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw,\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=[edge_id]):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(len(rule_list) + len(existing_rules) - 1,
|
||||
len(backend_rules))
|
||||
self.assertEqual('Subnet Rule', backend_rules[0]['name'])
|
||||
self.assertEqual('DNAT Rule', backend_rules[-1]['name'])
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
|
||||
def test_delete_firewall(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
firewall = self._fake_firewall_no_rule()
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.delete_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(True, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertIsNone(backend_rules)
|
||||
|
||||
def test_create_firewall_with_admin_down(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall_with_admin_down(rule_list)
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual([], backend_rules)
|
||||
|
||||
def test_should_apply_firewall_to_router(self):
|
||||
router = {'id': 'fake_id',
|
||||
|
Loading…
x
Reference in New Issue
Block a user