Merge "NSX|P: Add scope to security group rules"
This commit is contained in:
commit
6cf3ae038e
@ -1705,6 +1705,8 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
|
|
||||||
# create default section and rules
|
# create default section and rules
|
||||||
logged = cfg.CONF.nsx_p.log_security_groups_blocked_traffic
|
logged = cfg.CONF.nsx_p.log_security_groups_blocked_traffic
|
||||||
|
scope = [self.nsxpolicy.group.get_path(
|
||||||
|
NSX_P_GLOBAL_DOMAIN_ID, NSX_P_DEFAULT_GROUP)]
|
||||||
rule_id = 1
|
rule_id = 1
|
||||||
dhcp_client_rule = self.nsxpolicy.comm_map.build_entry(
|
dhcp_client_rule = self.nsxpolicy.comm_map.build_entry(
|
||||||
'DHCP Reply', NSX_P_GLOBAL_DOMAIN_ID,
|
'DHCP Reply', NSX_P_GLOBAL_DOMAIN_ID,
|
||||||
@ -1712,8 +1714,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
rule_id, sequence_number=rule_id,
|
rule_id, sequence_number=rule_id,
|
||||||
service_ids=['DHCP-Client'],
|
service_ids=['DHCP-Client'],
|
||||||
action=policy_constants.ACTION_ALLOW,
|
action=policy_constants.ACTION_ALLOW,
|
||||||
source_groups=None,
|
scope=scope,
|
||||||
dest_groups=[NSX_P_DEFAULT_GROUP],
|
|
||||||
direction=nsxlib_consts.IN,
|
direction=nsxlib_consts.IN,
|
||||||
logged=logged)
|
logged=logged)
|
||||||
rule_id += 1
|
rule_id += 1
|
||||||
@ -1723,8 +1724,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
rule_id, sequence_number=rule_id,
|
rule_id, sequence_number=rule_id,
|
||||||
service_ids=['DHCP-Server'],
|
service_ids=['DHCP-Server'],
|
||||||
action=policy_constants.ACTION_ALLOW,
|
action=policy_constants.ACTION_ALLOW,
|
||||||
source_groups=[NSX_P_DEFAULT_GROUP],
|
scope=scope,
|
||||||
dest_groups=None,
|
|
||||||
direction=nsxlib_consts.OUT,
|
direction=nsxlib_consts.OUT,
|
||||||
logged=logged)
|
logged=logged)
|
||||||
rule_id += 1
|
rule_id += 1
|
||||||
@ -1733,8 +1733,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
NSX_P_DEFAULT_SECTION,
|
NSX_P_DEFAULT_SECTION,
|
||||||
rule_id, sequence_number=rule_id, service_ids=None,
|
rule_id, sequence_number=rule_id, service_ids=None,
|
||||||
action=policy_constants.ACTION_DENY,
|
action=policy_constants.ACTION_DENY,
|
||||||
source_groups=None,
|
scope=scope,
|
||||||
dest_groups=[NSX_P_DEFAULT_GROUP],
|
|
||||||
direction=nsxlib_consts.IN_OUT,
|
direction=nsxlib_consts.IN_OUT,
|
||||||
logged=logged)
|
logged=logged)
|
||||||
rules = [dhcp_client_rule, dhcp_server_rule, block_rule]
|
rules = [dhcp_client_rule, dhcp_server_rule, block_rule]
|
||||||
@ -1907,6 +1906,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
service = self._get_rule_service_id(context, sg_rule, tags)
|
service = self._get_rule_service_id(context, sg_rule, tags)
|
||||||
logging = (cfg.CONF.nsx_p.log_security_groups_allowed_traffic or
|
logging = (cfg.CONF.nsx_p.log_security_groups_allowed_traffic or
|
||||||
secgroup_logging)
|
secgroup_logging)
|
||||||
|
scope = [self.nsxpolicy.group.get_path(domain_id, this_group_id)]
|
||||||
self.nsxpolicy.comm_map.create_entry(
|
self.nsxpolicy.comm_map.create_entry(
|
||||||
nsx_name, domain_id, map_id, entry_id=sg_rule['id'],
|
nsx_name, domain_id, map_id, entry_id=sg_rule['id'],
|
||||||
description=sg_rule.get('description'),
|
description=sg_rule.get('description'),
|
||||||
@ -1914,6 +1914,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
|||||||
action=policy_constants.ACTION_ALLOW,
|
action=policy_constants.ACTION_ALLOW,
|
||||||
source_groups=[source] if source else None,
|
source_groups=[source] if source else None,
|
||||||
dest_groups=[destination] if destination else None,
|
dest_groups=[destination] if destination else None,
|
||||||
|
scope=scope,
|
||||||
direction=direction, logged=logging)
|
direction=direction, logged=logging)
|
||||||
|
|
||||||
def _create_project_domain(self, context, project_id):
|
def _create_project_domain(self, context, project_id):
|
||||||
|
@ -989,6 +989,125 @@ class NsxPTestPorts(test_db_base_plugin_v2.TestPortsV2,
|
|||||||
self.assertEqual(exc.HTTPBadRequest.code,
|
self.assertEqual(exc.HTTPBadRequest.code,
|
||||||
res.status_int)
|
res.status_int)
|
||||||
|
|
||||||
|
def _test_create_direct_network(self, vlan_id=0):
|
||||||
|
net_type = vlan_id and 'vlan' or 'flat'
|
||||||
|
name = 'direct_net'
|
||||||
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
||||||
|
pnet.PHYSICAL_NETWORK: 'tzuuid'}
|
||||||
|
if vlan_id:
|
||||||
|
providernet_args[pnet.SEGMENTATION_ID] = vlan_id
|
||||||
|
|
||||||
|
mock_tt = mock.patch('vmware_nsxlib.v3.policy'
|
||||||
|
'.core_resources.NsxPolicyTransportZoneApi'
|
||||||
|
'.get_transport_type',
|
||||||
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN)
|
||||||
|
mock_tt.start()
|
||||||
|
return self.network(name=name,
|
||||||
|
providernet_args=providernet_args,
|
||||||
|
arg_list=(pnet.NETWORK_TYPE,
|
||||||
|
pnet.PHYSICAL_NETWORK,
|
||||||
|
pnet.SEGMENTATION_ID))
|
||||||
|
|
||||||
|
def _test_create_port_vnic_direct(self, vlan_id):
|
||||||
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
||||||
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
||||||
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN),\
|
||||||
|
self._test_create_direct_network(vlan_id=vlan_id) as network:
|
||||||
|
# Check that port security conflicts
|
||||||
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
||||||
|
psec.PORTSECURITY: True}
|
||||||
|
net_id = network['network']['id']
|
||||||
|
res = self._create_port(self.fmt, net_id=net_id,
|
||||||
|
arg_list=(portbindings.VNIC_TYPE,
|
||||||
|
psec.PORTSECURITY),
|
||||||
|
**kwargs)
|
||||||
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||||
|
|
||||||
|
# Check that security group conflicts
|
||||||
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
||||||
|
'security_groups': [
|
||||||
|
'4cd70774-cc67-4a87-9b39-7d1db38eb087'],
|
||||||
|
psec.PORTSECURITY: False}
|
||||||
|
net_id = network['network']['id']
|
||||||
|
res = self._create_port(self.fmt, net_id=net_id,
|
||||||
|
arg_list=(portbindings.VNIC_TYPE,
|
||||||
|
psec.PORTSECURITY),
|
||||||
|
**kwargs)
|
||||||
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||||
|
|
||||||
|
# All is kosher so we can create the port
|
||||||
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT}
|
||||||
|
net_id = network['network']['id']
|
||||||
|
res = self._create_port(self.fmt, net_id=net_id,
|
||||||
|
arg_list=(portbindings.VNIC_TYPE,),
|
||||||
|
**kwargs)
|
||||||
|
port = self.deserialize('json', res)
|
||||||
|
self.assertEqual("direct", port['port'][portbindings.VNIC_TYPE])
|
||||||
|
self.assertEqual("dvs", port['port'][portbindings.VIF_TYPE])
|
||||||
|
self.assertEqual(
|
||||||
|
vlan_id,
|
||||||
|
port['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
||||||
|
|
||||||
|
# try to get the same port
|
||||||
|
req = self.new_show_request('ports', port['port']['id'], self.fmt)
|
||||||
|
sport = self.deserialize(self.fmt, req.get_response(self.api))
|
||||||
|
self.assertEqual("dvs", sport['port'][portbindings.VIF_TYPE])
|
||||||
|
self.assertEqual("direct", sport['port'][portbindings.VNIC_TYPE])
|
||||||
|
self.assertEqual(
|
||||||
|
vlan_id,
|
||||||
|
sport['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
||||||
|
|
||||||
|
def test_create_port_vnic_direct_flat(self):
|
||||||
|
self._test_create_port_vnic_direct(0)
|
||||||
|
|
||||||
|
def test_create_port_vnic_direct_vlan(self):
|
||||||
|
self._test_create_port_vnic_direct(10)
|
||||||
|
|
||||||
|
def test_create_port_vnic_direct_invalid_network(self):
|
||||||
|
with self.network(name='not vlan/flat') as net:
|
||||||
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
||||||
|
psec.PORTSECURITY: False}
|
||||||
|
net_id = net['network']['id']
|
||||||
|
res = self._create_port(self.fmt, net_id=net_id,
|
||||||
|
arg_list=(portbindings.VNIC_TYPE,
|
||||||
|
psec.PORTSECURITY),
|
||||||
|
**kwargs)
|
||||||
|
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
||||||
|
|
||||||
|
def test_update_vnic_direct(self):
|
||||||
|
with self._test_create_direct_network(vlan_id=7) as network:
|
||||||
|
with self.subnet(network=network) as subnet:
|
||||||
|
with self.port(subnet=subnet) as port:
|
||||||
|
# need to do two updates as the update for port security
|
||||||
|
# disabled requires that it can only change 2 items
|
||||||
|
data = {'port': {psec.PORTSECURITY: False,
|
||||||
|
'security_groups': []}}
|
||||||
|
req = self.new_update_request('ports',
|
||||||
|
data, port['port']['id'])
|
||||||
|
res = self.deserialize('json', req.get_response(self.api))
|
||||||
|
self.assertEqual(portbindings.VNIC_NORMAL,
|
||||||
|
res['port'][portbindings.VNIC_TYPE])
|
||||||
|
|
||||||
|
data = {'port': {portbindings.VNIC_TYPE:
|
||||||
|
portbindings.VNIC_DIRECT}}
|
||||||
|
|
||||||
|
req = self.new_update_request('ports',
|
||||||
|
data, port['port']['id'])
|
||||||
|
res = self.deserialize('json', req.get_response(self.api))
|
||||||
|
self.assertEqual(portbindings.VNIC_DIRECT,
|
||||||
|
res['port'][portbindings.VNIC_TYPE])
|
||||||
|
|
||||||
|
def test_port_invalid_vnic_type(self):
|
||||||
|
with self._test_create_direct_network(vlan_id=7) as network:
|
||||||
|
kwargs = {portbindings.VNIC_TYPE: 'invalid',
|
||||||
|
psec.PORTSECURITY: False}
|
||||||
|
net_id = network['network']['id']
|
||||||
|
res = self._create_port(self.fmt, net_id=net_id,
|
||||||
|
arg_list=(portbindings.VNIC_TYPE,
|
||||||
|
psec.PORTSECURITY),
|
||||||
|
**kwargs)
|
||||||
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
||||||
|
|
||||||
|
|
||||||
class NsxPTestSubnets(test_db_base_plugin_v2.TestSubnetsV2,
|
class NsxPTestSubnets(test_db_base_plugin_v2.TestSubnetsV2,
|
||||||
NsxPPluginTestCaseMixin):
|
NsxPPluginTestCaseMixin):
|
||||||
@ -1152,6 +1271,7 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||||||
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
||||||
super(NsxPTestSecurityGroup, self).setUp(plugin=plugin,
|
super(NsxPTestSecurityGroup, self).setUp(plugin=plugin,
|
||||||
ext_mgr=ext_mgr)
|
ext_mgr=ext_mgr)
|
||||||
|
self.project_id = test_db_base_plugin_v2.TEST_TENANT_ID
|
||||||
|
|
||||||
def test_create_security_group_rule_icmp_with_type_and_code(self):
|
def test_create_security_group_rule_icmp_with_type_and_code(self):
|
||||||
"""No non-zero icmp codes are currently supported by the NSX"""
|
"""No non-zero icmp codes are currently supported by the NSX"""
|
||||||
@ -1183,125 +1303,6 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||||||
for k, v, in keys:
|
for k, v, in keys:
|
||||||
self.assertEqual(rule['security_group_rule'][k], v)
|
self.assertEqual(rule['security_group_rule'][k], v)
|
||||||
|
|
||||||
def _test_create_direct_network(self, vlan_id=0):
|
|
||||||
net_type = vlan_id and 'vlan' or 'flat'
|
|
||||||
name = 'direct_net'
|
|
||||||
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
||||||
pnet.PHYSICAL_NETWORK: 'tzuuid'}
|
|
||||||
if vlan_id:
|
|
||||||
providernet_args[pnet.SEGMENTATION_ID] = vlan_id
|
|
||||||
|
|
||||||
mock_tt = mock.patch('vmware_nsxlib.v3.policy'
|
|
||||||
'.core_resources.NsxPolicyTransportZoneApi'
|
|
||||||
'.get_transport_type',
|
|
||||||
return_value=nsx_constants.TRANSPORT_TYPE_VLAN)
|
|
||||||
mock_tt.start()
|
|
||||||
return self.network(name=name,
|
|
||||||
providernet_args=providernet_args,
|
|
||||||
arg_list=(pnet.NETWORK_TYPE,
|
|
||||||
pnet.PHYSICAL_NETWORK,
|
|
||||||
pnet.SEGMENTATION_ID))
|
|
||||||
|
|
||||||
def _test_create_port_vnic_direct(self, vlan_id):
|
|
||||||
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
||||||
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
||||||
return_value=nsx_constants.TRANSPORT_TYPE_VLAN),\
|
|
||||||
self._test_create_direct_network(vlan_id=vlan_id) as network:
|
|
||||||
# Check that port security conflicts
|
|
||||||
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
||||||
psec.PORTSECURITY: True}
|
|
||||||
net_id = network['network']['id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id,
|
|
||||||
arg_list=(portbindings.VNIC_TYPE,
|
|
||||||
psec.PORTSECURITY),
|
|
||||||
**kwargs)
|
|
||||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
||||||
|
|
||||||
# Check that security group conflicts
|
|
||||||
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
||||||
'security_groups': [
|
|
||||||
'4cd70774-cc67-4a87-9b39-7d1db38eb087'],
|
|
||||||
psec.PORTSECURITY: False}
|
|
||||||
net_id = network['network']['id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id,
|
|
||||||
arg_list=(portbindings.VNIC_TYPE,
|
|
||||||
psec.PORTSECURITY),
|
|
||||||
**kwargs)
|
|
||||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
||||||
|
|
||||||
# All is kosher so we can create the port
|
|
||||||
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT}
|
|
||||||
net_id = network['network']['id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id,
|
|
||||||
arg_list=(portbindings.VNIC_TYPE,),
|
|
||||||
**kwargs)
|
|
||||||
port = self.deserialize('json', res)
|
|
||||||
self.assertEqual("direct", port['port'][portbindings.VNIC_TYPE])
|
|
||||||
self.assertEqual("dvs", port['port'][portbindings.VIF_TYPE])
|
|
||||||
self.assertEqual(
|
|
||||||
vlan_id,
|
|
||||||
port['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
|
||||||
|
|
||||||
# try to get the same port
|
|
||||||
req = self.new_show_request('ports', port['port']['id'], self.fmt)
|
|
||||||
sport = self.deserialize(self.fmt, req.get_response(self.api))
|
|
||||||
self.assertEqual("dvs", sport['port'][portbindings.VIF_TYPE])
|
|
||||||
self.assertEqual("direct", sport['port'][portbindings.VNIC_TYPE])
|
|
||||||
self.assertEqual(
|
|
||||||
vlan_id,
|
|
||||||
sport['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
|
||||||
|
|
||||||
def test_create_port_vnic_direct_flat(self):
|
|
||||||
self._test_create_port_vnic_direct(0)
|
|
||||||
|
|
||||||
def test_create_port_vnic_direct_vlan(self):
|
|
||||||
self._test_create_port_vnic_direct(10)
|
|
||||||
|
|
||||||
def test_create_port_vnic_direct_invalid_network(self):
|
|
||||||
with self.network(name='not vlan/flat') as net:
|
|
||||||
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
||||||
psec.PORTSECURITY: False}
|
|
||||||
net_id = net['network']['id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id,
|
|
||||||
arg_list=(portbindings.VNIC_TYPE,
|
|
||||||
psec.PORTSECURITY),
|
|
||||||
**kwargs)
|
|
||||||
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
|
||||||
|
|
||||||
def test_update_vnic_direct(self):
|
|
||||||
with self._test_create_direct_network(vlan_id=7) as network:
|
|
||||||
with self.subnet(network=network) as subnet:
|
|
||||||
with self.port(subnet=subnet) as port:
|
|
||||||
# need to do two updates as the update for port security
|
|
||||||
# disabled requires that it can only change 2 items
|
|
||||||
data = {'port': {psec.PORTSECURITY: False,
|
|
||||||
'security_groups': []}}
|
|
||||||
req = self.new_update_request('ports',
|
|
||||||
data, port['port']['id'])
|
|
||||||
res = self.deserialize('json', req.get_response(self.api))
|
|
||||||
self.assertEqual(portbindings.VNIC_NORMAL,
|
|
||||||
res['port'][portbindings.VNIC_TYPE])
|
|
||||||
|
|
||||||
data = {'port': {portbindings.VNIC_TYPE:
|
|
||||||
portbindings.VNIC_DIRECT}}
|
|
||||||
|
|
||||||
req = self.new_update_request('ports',
|
|
||||||
data, port['port']['id'])
|
|
||||||
res = self.deserialize('json', req.get_response(self.api))
|
|
||||||
self.assertEqual(portbindings.VNIC_DIRECT,
|
|
||||||
res['port'][portbindings.VNIC_TYPE])
|
|
||||||
|
|
||||||
def test_port_invalid_vnic_type(self):
|
|
||||||
with self._test_create_direct_network(vlan_id=7) as network:
|
|
||||||
kwargs = {portbindings.VNIC_TYPE: 'invalid',
|
|
||||||
psec.PORTSECURITY: False}
|
|
||||||
net_id = network['network']['id']
|
|
||||||
res = self._create_port(self.fmt, net_id=net_id,
|
|
||||||
arg_list=(portbindings.VNIC_TYPE,
|
|
||||||
psec.PORTSECURITY),
|
|
||||||
**kwargs)
|
|
||||||
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
||||||
|
|
||||||
@common_v3.with_no_dhcp_subnet
|
@common_v3.with_no_dhcp_subnet
|
||||||
def test_list_ports_security_group(self):
|
def test_list_ports_security_group(self):
|
||||||
return super(NsxPTestSecurityGroup,
|
return super(NsxPTestSecurityGroup,
|
||||||
@ -1319,6 +1320,65 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||||||
super(NsxPTestSecurityGroup, self).\
|
super(NsxPTestSecurityGroup, self).\
|
||||||
test_create_security_group_source_group_ip_and_ip_prefix()
|
test_create_security_group_source_group_ip_and_ip_prefix()
|
||||||
|
|
||||||
|
def _create_default_sg(self):
|
||||||
|
self.plugin._ensure_default_security_group(
|
||||||
|
context.get_admin_context(), self.project_id)
|
||||||
|
|
||||||
|
def test_sg_create_on_nsx(self):
|
||||||
|
"""Verify that a group and comm-map are created for a new SG"""
|
||||||
|
# Make sure the default SG is created before testing
|
||||||
|
self._create_default_sg()
|
||||||
|
name = description = 'sg1'
|
||||||
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
||||||
|
"NsxPolicyGroupApi.create_or_overwrite_with_conditions"
|
||||||
|
) as group_create,\
|
||||||
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
||||||
|
"NsxPolicyCommunicationMapApi."
|
||||||
|
"create_or_overwrite_map_only") as comm_map_create,\
|
||||||
|
self.security_group(name, description) as sg:
|
||||||
|
sg_id = sg['security_group']['id']
|
||||||
|
nsx_name = utils.get_name_and_uuid(name, sg_id)
|
||||||
|
group_create.assert_called_once_with(
|
||||||
|
nsx_name, self.project_id, group_id=sg_id,
|
||||||
|
description=description,
|
||||||
|
conditions=[mock.ANY], tags=mock.ANY)
|
||||||
|
comm_map_create.assert_called_once_with(
|
||||||
|
nsx_name, self.project_id, map_id=sg_id,
|
||||||
|
description=description,
|
||||||
|
tags=mock.ANY,
|
||||||
|
category=policy_constants.CATEGORY_ENVIRONMENT)
|
||||||
|
|
||||||
|
def test_sg_rule_create_on_nsx(self):
|
||||||
|
"""Verify that a comm-map entry is created for a new SG rule """
|
||||||
|
name = description = 'sg1'
|
||||||
|
direction = "ingress"
|
||||||
|
remote_ip_prefix = "10.0.0.0/24"
|
||||||
|
protocol = "tcp"
|
||||||
|
port_range_min = 80
|
||||||
|
port_range_max = 80
|
||||||
|
with self.security_group(name, description) as sg:
|
||||||
|
sg_id = sg['security_group']['id']
|
||||||
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
||||||
|
"NsxPolicyCommunicationMapApi.create_entry"
|
||||||
|
) as entry_create,\
|
||||||
|
self.security_group_rule(sg_id, direction,
|
||||||
|
protocol, port_range_min,
|
||||||
|
port_range_max,
|
||||||
|
remote_ip_prefix) as rule:
|
||||||
|
rule_id = rule['security_group_rule']['id']
|
||||||
|
scope = [self.plugin.nsxpolicy.group.get_path(
|
||||||
|
self.project_id, sg_id)]
|
||||||
|
entry_create.assert_called_once_with(
|
||||||
|
rule_id, self.project_id, sg_id, entry_id=rule_id,
|
||||||
|
description='',
|
||||||
|
direction=nsx_constants.IN,
|
||||||
|
action=policy_constants.ACTION_ALLOW,
|
||||||
|
service_ids=mock.ANY,
|
||||||
|
source_groups=mock.ANY,
|
||||||
|
dest_groups=mock.ANY,
|
||||||
|
scope=scope,
|
||||||
|
logged=False)
|
||||||
|
|
||||||
|
|
||||||
class NsxPTestL3ExtensionManager(object):
|
class NsxPTestL3ExtensionManager(object):
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user