Dan Wendlandt 54bc60850f prevent invalid deletion of ports using by L3 devices
bug 1039947

The bug noticed that an admin user could delete a port that stored
the underlying IP allocation for a floating IP.  This patch prevents
the direction deletion of ports via the API for ports that are used as
router interfaces, router gateways, of for floating IPs.

Add a unit test to check such an invalid delete, and also updates
unit tests to avoid them tripping over the new checks.

Change-Id: Ief28e3181583428d55259275a7c21151a4a4fa9b
2012-08-31 04:29:12 -07:00

226 lines
7.4 KiB
Python

"""
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 Nicira Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Dan Wendlandt, Nicira, Inc
#
"""
from abc import abstractmethod
from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
from quantum.common import exceptions as qexception
from quantum.extensions import extensions
from quantum import manager
from quantum.openstack.common import cfg
from quantum import quota
# L3 Exceptions
class RouterNotFound(qexception.NotFound):
message = _("Router %(router_id)s could not be found")
class RouterInUse(qexception.InUse):
message = _("Router %(router_id)s still has active ports")
class FloatingIPNotFound(qexception.NotFound):
message = _("Floating IP %(floatingip_id)s could not be found")
class ExternalGatewayForFloatingIPNotFound(qexception.NotFound):
message = _("Could not find an external network gateway reachable "
"from subnet %(subnet_id)s. Therefore, cannot associate "
"Port %(port_id)s with a Floating IP.")
class FloatingIPPortAlreadyAssociated(qexception.InUse):
message = _("Port %(port_id)s already has a floating IP"
" associated with it")
class L3PortInUse(qexception.InUse):
message = _("Port %(port_id)s has owner %(device_owner)s and therefore"
" cannot be deleted directly via the port API.")
def _validate_uuid_or_none(data, valid_values=None):
if data is None:
return None
return attr._validate_regex(data, attr.UUID_PATTERN)
attr.validators['type:uuid_or_none'] = _validate_uuid_or_none
# Attribute Map
RESOURCE_ATTRIBUTE_MAP = {
'routers': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:regex': attr.UUID_PATTERN},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': ''},
'admin_state_up': {'allow_post': True, 'allow_put': True,
'default': True,
'convert_to': attr.convert_to_boolean,
'validate': {'type:boolean': None},
'is_visible': True},
'status': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
'external_gateway_info': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None}
},
'floatingips': {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'floating_ip_address': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'floating_network_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:regex': attr.UUID_PATTERN},
'is_visible': True},
'router_id': {'allow_post': False, 'allow_put': False,
'is_visible': True, 'default': None},
'port_id': {'allow_post': True, 'allow_put': True,
'validate': {'type:uuid_or_none': None},
'is_visible': True, 'default': None},
'fixed_ip_address': {'allow_post': True, 'allow_put': True,
'validate': {'type:ip_address_or_none': None},
'is_visible': True, 'default': None},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True}
},
}
l3_quota_opts = [
cfg.IntOpt('quota_router',
default=10,
help='number of routers allowed per tenant, -1 for unlimited'),
cfg.IntOpt('quota_floatingip',
default=50,
help='number of floating IPs allowed per tenant, '
'-1 for unlimited'),
]
cfg.CONF.register_opts(l3_quota_opts, 'QUOTAS')
class L3(object):
@classmethod
def get_name(cls):
return "Quantum Router"
@classmethod
def get_alias(cls):
return "os-quantum-router"
@classmethod
def get_description(cls):
return ("Router abstraction for basic L3 forwarding"
" between L2 Quantum networks and access to external"
" networks via a NAT gateway.")
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/os-quantum-router/api/v1.0"
@classmethod
def get_updated(cls):
return "2012-07-20T10:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Ext Resources """
exts = []
plugin = manager.QuantumManager.get_plugin()
for resource_name in ['router', 'floatingip']:
collection_name = resource_name + "s"
params = RESOURCE_ATTRIBUTE_MAP.get(collection_name, dict())
member_actions = {}
if resource_name == 'router':
member_actions = {'add_router_interface': 'PUT',
'remove_router_interface': 'PUT'}
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
plugin, params,
member_actions=member_actions)
ex = extensions.ResourceExtension(collection_name,
controller,
member_actions=member_actions)
exts.append(ex)
return exts
class RouterPluginBase(object):
@abstractmethod
def create_router(self, context, router):
pass
@abstractmethod
def update_router(self, context, id, router):
pass
@abstractmethod
def get_router(self, context, id, fields=None):
pass
@abstractmethod
def delete_router(self, context, id):
pass
@abstractmethod
def get_routers(self, context, filters=None, fields=None):
pass
@abstractmethod
def add_router_interface(self, context, router_id, interface_info):
pass
@abstractmethod
def remove_router_interface(self, context, router_id, interface_info):
pass
@abstractmethod
def create_floatingip(self, context, floatingip):
pass
@abstractmethod
def update_floatingip(self, context, id, floatingip):
pass
@abstractmethod
def get_floatingip(self, context, id, fields=None):
pass
@abstractmethod
def delete_floatingip(self, context, id):
pass
@abstractmethod
def get_floatingips(self, context, filters=None, fields=None):
pass