Merge "Check for VPN Objects when deleting interfaces"

This commit is contained in:
Jenkins 2014-10-21 17:37:57 +00:00 committed by Gerrit Code Review
commit 74efa3c63b
4 changed files with 71 additions and 0 deletions

View File

@ -316,6 +316,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
router.gw_port = None router.gw_port = None
context.session.add(router) context.session.add(router)
context.session.expire(gw_port) context.session.expire(gw_port)
vpnservice = manager.NeutronManager.get_service_plugins().get(
constants.VPN)
if vpnservice:
vpnservice.check_router_in_use(context, router_id)
self._core_plugin.delete_port( self._core_plugin.delete_port(
admin_ctx, gw_port['id'], l3_port_check=False) admin_ctx, gw_port['id'], l3_port_check=False)
@ -518,6 +522,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
subnet_db = self._core_plugin._get_subnet(context, subnet_id) subnet_db = self._core_plugin._get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet_db['cidr']) subnet_cidr = netaddr.IPNetwork(subnet_db['cidr'])
fip_qry = context.session.query(FloatingIP) fip_qry = context.session.query(FloatingIP)
vpnservice = manager.NeutronManager.get_service_plugins().get(
constants.VPN)
if vpnservice:
vpnservice.check_subnet_in_use(context, subnet_id)
for fip_db in fip_qry.filter_by(router_id=router_id): for fip_db in fip_qry.filter_by(router_id=router_id):
if netaddr.IPAddress(fip_db['fixed_ip_address']) in subnet_cidr: if netaddr.IPAddress(fip_db['fixed_ip_address']) in subnet_cidr:
raise l3.RouterInterfaceInUseByFloatingIP( raise l3.RouterInterfaceInUseByFloatingIP(

View File

@ -602,6 +602,16 @@ class VPNPluginDb(vpnaas.VPNPluginBase, base_db.CommonDbMixin):
router_id=router_id, router_id=router_id,
vpnservice_id=vpnservices[0]['id']) vpnservice_id=vpnservices[0]['id'])
def check_subnet_in_use(self, context, subnet_id):
with context.session.begin(subtransactions=True):
vpnservices = context.session.query(VPNService).filter_by(
subnet_id=subnet_id
).first()
if vpnservices:
raise vpnaas.SubnetInUseByVPNService(
subnet_id=subnet_id,
vpnservice_id=vpnservices['id'])
class VPNPluginRpcDbMixin(): class VPNPluginRpcDbMixin():
def _get_agent_hosting_vpn_services(self, context, host): def _get_agent_hosting_vpn_services(self, context, host):

View File

@ -64,6 +64,10 @@ class RouterInUseByVPNService(qexception.InUse):
message = _("Router %(router_id)s is used by VPNService %(vpnservice_id)s") message = _("Router %(router_id)s is used by VPNService %(vpnservice_id)s")
class SubnetInUseByVPNService(qexception.InUse):
message = _("Subnet %(subnet_id)s is used by VPNService %(vpnservice_id)s")
class VPNStateInvalidToUpdate(qexception.BadRequest): class VPNStateInvalidToUpdate(qexception.BadRequest):
message = _("Invalid state %(state)s of vpnaas resource %(id)s" message = _("Invalid state %(state)s of vpnaas resource %(id)s"
" for updating") " for updating")

View File

@ -867,6 +867,55 @@ class TestVpnaas(VPNPluginDbTestCase):
if k in expected), if k in expected),
expected) expected)
def test_delete_router_interface_in_use_by_vpnservice(self):
"""Test delete router interface in use by vpn service."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.vpnservice(subnet=subnet,
router=router):
self._router_interface_action('remove',
router['router']['id'],
subnet['subnet']['id'],
None,
expected_code=webob.exc.
HTTPConflict.code)
def test_delete_external_gateway_interface_in_use_by_vpnservice(self):
"""Test delete external gateway interface in use by vpn service."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.subnet(cidr='11.0.0.0/24') as public_sub:
self._set_net_external(
public_sub['subnet']['network_id'])
self._add_external_gateway_to_router(
router['router']['id'],
public_sub['subnet']['network_id'])
with self.vpnservice(subnet=subnet,
router=router):
self._remove_external_gateway_from_router(
router['router']['id'],
public_sub['subnet']['network_id'],
expected_code=webob.exc.HTTPConflict.code)
def test_router_update_after_ipsec_site_connection(self):
"""Test case to update router after vpn connection."""
rname1 = "router_one"
rname2 = "router_two"
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router(name=rname1) as r:
with self.vpnservice(subnet=subnet,
router=r
) as vpnservice:
self.ipsec_site_connection(
name='connection1', vpnservice=vpnservice
)
body = self._show('routers', r['router']['id'])
self.assertEqual(body['router']['name'], rname1)
body = self._update('routers', r['router']['id'],
{'router': {'name': rname2}})
body = self._show('routers', r['router']['id'])
self.assertEqual(body['router']['name'], rname2)
def test_update_vpnservice(self): def test_update_vpnservice(self):
"""Test case to update a vpnservice.""" """Test case to update a vpnservice."""
name = 'new_vpnservice1' name = 'new_vpnservice1'