Merge "Fix IPv6 RA security group rule for DVR"

This commit is contained in:
Jenkins 2014-12-13 01:39:11 +00:00 committed by Gerrit Code Review
commit 7b4be85e11
2 changed files with 75 additions and 2 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import netaddr
from sqlalchemy import or_
from sqlalchemy.orm import exc
from neutron.common import constants as q_const
@ -351,8 +352,10 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
models_v2.IPAllocation.subnet_id == subnet['id'])
query = query.filter(
models_v2.IPAllocation.ip_address == subnet['gateway_ip'])
query = query.filter(models_v2.Port.device_owner ==
q_const.DEVICE_OWNER_ROUTER_INTF)
query = query.filter(or_(models_v2.Port.device_owner ==
q_const.DEVICE_OWNER_ROUTER_INTF,
models_v2.Port.device_owner ==
q_const.DEVICE_OWNER_DVR_INTERFACE))
try:
mac_address = query.one()[0]
except (exc.NoResultFound, exc.MultipleResultsFound):

View File

@ -783,6 +783,76 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', gateway_port_id)
self._delete('ports', interface_port_id)
def test_security_group_ra_rules_for_devices_ipv6_dvr(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_GLOBAL']
with self.network() as n:
with contextlib.nested(self.subnet(n,
gateway_ip=fake_gateway,
cidr=fake_prefix,
ip_version=6,
ipv6_ra_mode=const.IPV6_SLAAC),
self.security_group()) as (subnet_v6,
sg1):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', const.PROTO_NAME_TCP, '22',
'22',
ethertype=const.IPv6)
rules = {
'security_group_rules': [rule1['security_group_rule']]}
self._make_security_group_rule(self.fmt, rules)
# Create DVR router interface port
gateway_res = self._make_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
'ip_address': fake_gateway}],
device_owner=const.DEVICE_OWNER_DVR_INTERFACE)
gateway_mac = gateway_res['port']['mac_address']
gateway_port_id = gateway_res['port']['id']
gateway_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
const.IPV6_LLA_PREFIX,
gateway_mac))
ports_rest1 = self._make_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg1_id])
port_id1 = ports_rest1['port']['id']
self.rpc.devices = {port_id1: ports_rest1['port']}
devices = [port_id1, 'no_exist_device']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'ingress',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
{'direction': 'ingress',
'protocol': const.PROTO_NAME_ICMP_V6,
'ethertype': const.IPv6,
'source_ip_prefix': gateway_lla_ip,
'source_port_range_min': const.ICMPV6_TYPE_RA},
]
self.assertEqual(port_rpc['security_group_rules'],
expected)
self._delete('ports', port_id1)
# Note(xuhanp): remove gateway port's fixed_ips or gateway port
# deletion will be prevented.
data = {'port': {'fixed_ips': []}}
req = self.new_update_request('ports', data, gateway_port_id)
self.deserialize(self.fmt, req.get_response(self.api))
self._delete('ports', gateway_port_id)
def test_security_group_ra_rules_for_devices_ipv6_gateway_lla(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP['IPv6_LLA']