Prevent update_port ip_address from matching address_pair

Previously one could update a port to match one of the address_pairs.
This patch prevents that.

Fixes bug: 1223646

Change-Id: Ic8cecfdb83b3fd9d2691bb41e0427cf578577fc8
This commit is contained in:
Aaron Rosen 2013-09-10 20:32:28 -07:00
parent 3e564f8e03
commit cfcdf9c7a5
5 changed files with 52 additions and 4 deletions

View File

@ -55,9 +55,6 @@ class AllowedAddressPairsMixin(object):
if ((fixed_ip['ip_address'] == address_pair['ip_address'])
and (port['mac_address'] ==
address_pair['mac_address'])):
#TODO(arosen) - need to query for address pairs
# to check for same condition if fixed_ips change to
# be an address pair.
raise addr_pair.AddressPairMatchesPortFixedIPAndMac()
db_pair = AllowedAddressPair(
port_id=port['id'],
@ -67,6 +64,14 @@ class AllowedAddressPairsMixin(object):
return allowed_address_pairs
def _check_fixed_ips_and_address_pairs_no_overlap(self, context, port):
address_pairs = self.get_allowed_address_pairs(context, port['id'])
for fixed_ip in port['fixed_ips']:
for address_pair in address_pairs:
if (fixed_ip['ip_address'] == address_pair['ip_address']
and port['mac_address'] == address_pair['mac_address']):
raise addr_pair.AddressPairMatchesPortFixedIPAndMac()
def get_allowed_address_pairs(self, context, port_id):
pairs = (context.session.query(AllowedAddressPair).
filter_by(port_id=port_id))

View File

@ -468,6 +468,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
need_port_update_notify = False
session = context.session
changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
original_port = super(Ml2Plugin, self).get_port(context, id)
updated_port = super(Ml2Plugin, self).update_port(context, id,
@ -478,6 +479,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context, updated_port,
port['port'][addr_pair.ADDRESS_PAIRS])
need_port_update_notify = True
elif changed_fixed_ips:
self._check_fixed_ips_and_address_pairs_no_overlap(
context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
network = self.get_network(context, original_port['network_id'])

View File

@ -1205,6 +1205,7 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
return port_data
def update_port(self, context, id, port):
changed_fixed_ips = 'fixed_ips' in port['port']
delete_security_groups = self._check_update_deletes_security_groups(
port)
has_security_groups = self._check_update_has_security_groups(port)
@ -1246,6 +1247,9 @@ class NvpPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
elif changed_fixed_ips:
self._check_fixed_ips_and_address_pairs_no_overlap(context,
ret_port)
# checks if security groups were updated adding/modifying
# security groups, port security is set and port has ip
if not (has_ip and ret_port[psec.PORTSECURITY]):

View File

@ -566,6 +566,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def update_port(self, context, id, port):
session = context.session
need_port_update_notify = False
changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
original_port = super(OVSNeutronPluginV2, self).get_port(
context, id)
@ -577,7 +578,9 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
context, updated_port,
port['port'][addr_pair.ADDRESS_PAIRS])
need_port_update_notify = True
elif changed_fixed_ips:
self._check_fixed_ips_and_address_pairs_no_overlap(
context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
self._process_portbindings_create_and_update(context,

View File

@ -64,6 +64,7 @@ class AllowedAddressPairTestPlugin(portsecurity_db.PortSecurityDbMixin,
return port['port']
def update_port(self, context, id, port):
changed_fixed_ips = 'fixed_ips' in port['port']
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
port)
has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
@ -81,6 +82,9 @@ class AllowedAddressPairTestPlugin(portsecurity_db.PortSecurityDbMixin,
self._process_create_allowed_address_pairs(
context, ret_port,
ret_port[addr_pair.ADDRESS_PAIRS])
elif changed_fixed_ips:
self._check_fixed_ips_and_address_pairs_no_overlap(context,
ret_port)
return ret_port
@ -196,6 +200,34 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
address_pairs)
self._delete('ports', port['port']['id'])
def test_update_fixed_ip_to_address_pair_ip_fail(self):
with self.network() as net:
with self.subnet(network=net):
address_pairs = [{'ip_address': '10.0.0.65'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_pair.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)['port']
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.65'}]}}
req = self.new_update_request('ports', data, port['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 400)
self._delete('ports', port['id'])
def test_update_fixed_ip_to_address_pair_with_mac_fail(self):
with self.network() as net:
with self.subnet(network=net):
res = self._create_port(self.fmt, net['network']['id'])
port = self.deserialize(self.fmt, res)['port']
address_pairs = [
{'mac_address': port['mac_address'],
'ip_address': port['fixed_ips'][0]['ip_address']}]
data = {'port': {addr_pair.ADDRESS_PAIRS: address_pairs}}
req = self.new_update_request('ports', data, port['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 400)
self._delete('ports', port['id'])
def test_create_address_gets_port_mac(self):
with self.network() as net:
address_pairs = [{'ip_address': '23.23.23.23'}]