Adds the 'public network' concept to Quantum

Implements blueprint quantum-v2-public-networks

This patch allows Quantum to handle public networks. It modifies the
API adding a new attribute to the network resource ('shared')
and enhances the policy engine in order to handle the behaviour of
the service wrt shared networks.
Policy.json specifies a default behaviour which can be changed by
the administrator, even at runtime.
Tests added to test_db_plugin validate 'obvious' behaviour - such as
that only the ports belonging to a given tenant should be returned
even when they are queried on a public network.
Tests added to test_policy instead validate the changes added to the
policy engine.

Change-Id: I0087d449a677ed29357cd3cb4d8580bece940fdf
This commit is contained in:
Salvatore Orlando 2012-07-26 01:10:02 -07:00
parent 319b05ff5f
commit 19738b9fab
11 changed files with 674 additions and 120 deletions

View File

@ -1,23 +1,37 @@
{ {
"admin_or_owner": [["role:admin"], ["tenant_id:%(tenant_id)s"]], "admin_or_owner": [["role:admin"], ["tenant_id:%(tenant_id)s"]],
"admin_or_network_owner": [["role:admin"], ["tenant_id:%(network_tenant_id)s"]],
"admin_only": [["role:admin"]],
"regular_user": [],
"default": [["rule:admin_or_owner"]], "default": [["rule:admin_or_owner"]],
"admin_api": [["role:admin"]], "extension:provider_network:view": [["rule:admin_only"]],
"extension:provider_network:view": [["rule:admin_api"]], "extension:provider_network:set": [["rule:admin_only"]],
"extension:provider_network:set": [["rule:admin_api"]],
"create_subnet": [], "networks:private:read": [["rule:admin_or_owner"]],
"networks:private:write": [["rule:admin_or_owner"]],
"networks:shared:read": [["rule:regular_user"]],
"networks:shared:write": [["rule:admin_only"]],
"create_subnet": [["rule:admin_or_network_owner"]],
"get_subnet": [["rule:admin_or_owner"]], "get_subnet": [["rule:admin_or_owner"]],
"update_subnet": [["rule:admin_or_owner"]], "update_subnet": [["rule:admin_or_network_owner"]],
"delete_subnet": [["rule:admin_or_owner"]], "delete_subnet": [["rule:admin_or_network_owner"]],
"create_network": [], "create_network": [],
"get_network": [["rule:admin_or_owner"]], "get_network": [],
"update_network": [["rule:admin_or_owner"]], "create_network:shared": [["rule:admin_only"]],
"delete_network": [["rule:admin_or_owner"]], "update_network": [],
"update_network:shared": [["rule:admin_only"]],
"delete_network": [],
"create_port": [], "create_port": [],
"create_port:mac_address": [["rule:admin_or_network_owner"]],
"create_port:host_routes": [["rule:admin_or_network_owner"]],
"create_port:fixed_ips": [["rule:admin_or_network_owner"]],
"get_port": [["rule:admin_or_owner"]], "get_port": [["rule:admin_or_owner"]],
"update_port": [["rule:admin_or_owner"]], "update_port": [["rule:admin_or_owner"]],
"update_port:host_routes": [["rule:admin_or_network_owner"]],
"update_port:fixed_ips": [["rule:admin_or_network_owner"]],
"delete_port": [["rule:admin_or_owner"]] "delete_port": [["rule:admin_or_owner"]]
} }

View File

@ -14,16 +14,8 @@
# limitations under the License. # limitations under the License.
ATTR_NOT_SPECIFIED = object() ATTR_NOT_SPECIFIED = object()
# Defining a constant to avoid repeating string literal in several modules
# Note: a default of ATTR_NOT_SPECIFIED indicates that an SHARED = 'shared'
# attribute is not required, but will be generated by the plugin
# if it is not specified. Particularly, a value of ATTR_NOT_SPECIFIED
# is different from an attribute that has been specified with a value of
# None. For example, if 'gateway_ip' is ommitted in a request to
# create a subnet, the plugin will receive ATTR_NOT_SPECIFIED
# and the default gateway_ip will be generated.
# However, if gateway_ip is specified as None, this means that
# the subnet does not have a gateway IP.
import logging import logging
import netaddr import netaddr
@ -137,10 +129,20 @@ validators = {'type:boolean': _validate_boolean,
# and the default gateway_ip will be generated. # and the default gateway_ip will be generated.
# However, if gateway_ip is specified as None, this means that # However, if gateway_ip is specified as None, this means that
# the subnet does not have a gateway IP. # the subnet does not have a gateway IP.
# Some of the following attributes are used by the policy engine. # The following is a short reference for understanding attribute info:
# They are explicitly marked with the required_by_policy flag to ensure # default: default value of the attribute (if missing, the attribute
# they are always returned by a plugin for policy processing, even if # becomes mandatory.
# they are not specified in the 'fields' query param # allow_post: the attribute can be used on POST requests.
# allow_put: the attribute can be used on PUT requests.
# validate: specifies rules for validating data in the attribute.
# convert_to: transformation to apply to the value before it is returned
# is_visible: the attribute is returned in GET responses.
# required_by_policy: the attribute is required by the policy engine and
# should therefore be filled by the API layer even if not present in
# request body.
# enforce_policy: the attribute is actively part of the policy enforcing
# mechanism, ie: there might be rules which refer to this attribute.
RESOURCE_ATTRIBUTE_MAP = { RESOURCE_ATTRIBUTE_MAP = {
'networks': { 'networks': {
'id': {'allow_post': False, 'allow_put': False, 'id': {'allow_post': False, 'allow_put': False,
@ -160,7 +162,15 @@ RESOURCE_ATTRIBUTE_MAP = {
'is_visible': True}, 'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False, 'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True, 'required_by_policy': True,
'is_visible': True} 'is_visible': True},
SHARED: {'allow_post': True,
'allow_put': True,
'default': False,
'convert_to': convert_to_boolean,
'validate': {'type:boolean': None},
'is_visible': True,
'required_by_policy': True,
'enforce_policy': True},
}, },
'ports': { 'ports': {
'id': {'allow_post': False, 'allow_put': False, 'id': {'allow_post': False, 'allow_put': False,
@ -179,12 +189,15 @@ RESOURCE_ATTRIBUTE_MAP = {
'mac_address': {'allow_post': True, 'allow_put': False, 'mac_address': {'allow_post': True, 'allow_put': False,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'validate': {'type:mac_address': None}, 'validate': {'type:mac_address': None},
'enforce_policy': True,
'is_visible': True}, 'is_visible': True},
'fixed_ips': {'allow_post': True, 'allow_put': True, 'fixed_ips': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': True}, 'is_visible': True},
'host_routes': {'allow_post': True, 'allow_put': True, 'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED, 'default': ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': False}, 'is_visible': False},
'device_id': {'allow_post': True, 'allow_put': True, 'device_id': {'allow_post': True, 'allow_put': True,
'default': '', 'default': '',
@ -235,3 +248,11 @@ RESOURCE_ATTRIBUTE_MAP = {
'is_visible': True}, 'is_visible': True},
} }
} }
# Associates to each resource its own parent resource
# Resources without parents, such as networks, are not in this list
RESOURCE_HIERARCHY_MAP = {
'ports': {'parent': 'networks', 'identified_by': 'network_id'},
'subnets': {'parent': 'networks', 'identified_by': 'network_id'}
}

View File

@ -40,6 +40,7 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
exceptions.OverlappingAllocationPools: webob.exc.HTTPConflict, exceptions.OverlappingAllocationPools: webob.exc.HTTPConflict,
exceptions.OutOfBoundsAllocationPool: webob.exc.HTTPBadRequest, exceptions.OutOfBoundsAllocationPool: webob.exc.HTTPBadRequest,
exceptions.InvalidAllocationPool: webob.exc.HTTPBadRequest, exceptions.InvalidAllocationPool: webob.exc.HTTPBadRequest,
exceptions.InvalidSharedSetting: webob.exc.HTTPConflict,
} }
QUOTAS = quota.QUOTAS QUOTAS = quota.QUOTAS
@ -122,8 +123,7 @@ class Controller(object):
self._resource = resource self._resource = resource
self._attr_info = attr_info self._attr_info = attr_info
self._policy_attrs = [name for (name, info) in self._attr_info.items() self._policy_attrs = [name for (name, info) in self._attr_info.items()
if 'required_by_policy' in info if info.get('required_by_policy')]
and info['required_by_policy']]
self._publisher_id = notifier_api.publisher_id('network') self._publisher_id = notifier_api.publisher_id('network')
def _is_visible(self, attr): def _is_visible(self, attr):
@ -160,33 +160,32 @@ class Controller(object):
obj_list = obj_getter(request.context, **kwargs) obj_list = obj_getter(request.context, **kwargs)
# Check authz # Check authz
if do_authz: if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
# Omit items from list that should not be visible # Omit items from list that should not be visible
obj_list = [obj for obj in obj_list obj_list = [obj for obj in obj_list
if policy.check(request.context, if policy.check(request.context,
"get_%s" % self._resource, "get_%s" % self._resource,
obj)] obj,
plugin=self._plugin)]
return {self._collection: [self._view(obj, return {self._collection: [self._view(obj,
fields_to_strip=fields_to_add) fields_to_strip=fields_to_add)
for obj in obj_list]} for obj in obj_list]}
def _item(self, request, id, do_authz=False): def _item(self, request, id, do_authz=False, field_list=None):
"""Retrieves and formats a single element of the requested entity""" """Retrieves and formats a single element of the requested entity"""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
field_list, added_fields = self._do_field_list(fields(request))
kwargs = {'verbose': verbose(request), kwargs = {'verbose': verbose(request),
'fields': field_list} 'fields': field_list}
action = "get_%s" % self._resource action = "get_%s" % self._resource
obj_getter = getattr(self._plugin, action) obj_getter = getattr(self._plugin, action)
obj = obj_getter(request.context, id, **kwargs) obj = obj_getter(request.context, id, **kwargs)
# Check authz # Check authz
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
if do_authz: if do_authz:
policy.enforce(request.context, action, obj) policy.enforce(request.context, action, obj, plugin=self._plugin)
return obj
return {self._resource: self._view(obj, fields_to_strip=added_fields)}
def index(self, request): def index(self, request):
"""Returns a list of the requested entity""" """Returns a list of the requested entity"""
@ -195,7 +194,16 @@ class Controller(object):
def show(self, request, id): def show(self, request, id):
"""Returns detailed information about the requested entity""" """Returns detailed information about the requested entity"""
try: try:
return self._item(request, id, True) # NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped
# away by the plugin before returning.
field_list, added_fields = self._do_field_list(fields(request))
return {self._resource:
self._view(self._item(request,
id,
do_authz=True,
field_list=field_list),
fields_to_strip=added_fields)}
except exceptions.PolicyNotAuthorized: except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it # To avoid giving away information, pretend that it
# doesn't exist # doesn't exist
@ -221,11 +229,10 @@ class Controller(object):
request, request,
item[self._resource], item[self._resource],
) )
policy.enforce( policy.enforce(request.context,
request.context,
action, action,
item[self._resource], item[self._resource],
) plugin=self._plugin)
count = QUOTAS.count(request.context, self._resource, count = QUOTAS.count(request.context, self._resource,
self._plugin, self._collection, self._plugin, self._collection,
item[self._resource]['tenant_id']) item[self._resource]['tenant_id'])
@ -236,13 +243,17 @@ class Controller(object):
request, request,
body[self._resource] body[self._resource]
) )
policy.enforce(request.context, action, body[self._resource]) policy.enforce(request.context,
action,
body[self._resource],
plugin=self._plugin)
count = QUOTAS.count(request.context, self._resource, count = QUOTAS.count(request.context, self._resource,
self._plugin, self._collection, self._plugin, self._collection,
body[self._resource]['tenant_id']) body[self._resource]['tenant_id'])
kwargs = {self._resource: count + 1} kwargs = {self._resource: count + 1}
QUOTAS.limit_check(request.context, **kwargs) QUOTAS.limit_check(request.context, **kwargs)
except exceptions.PolicyNotAuthorized: except exceptions.PolicyNotAuthorized:
LOG.exception("Create operation not authorized")
raise webob.exc.HTTPForbidden() raise webob.exc.HTTPForbidden()
obj_creator = getattr(self._plugin, action) obj_creator = getattr(self._plugin, action)
@ -266,9 +277,12 @@ class Controller(object):
action = "delete_%s" % self._resource action = "delete_%s" % self._resource
# Check authz # Check authz
obj = self._item(request, id)[self._resource] obj = self._item(request, id)
try: try:
policy.enforce(request.context, action, obj) policy.enforce(request.context,
action,
obj,
plugin=self._plugin)
except exceptions.PolicyNotAuthorized: except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it # To avoid giving away information, pretend that it
# doesn't exist # doesn't exist
@ -293,11 +307,21 @@ class Controller(object):
payload) payload)
body = self._prepare_request_body(request.context, body, False) body = self._prepare_request_body(request.context, body, False)
action = "update_%s" % self._resource action = "update_%s" % self._resource
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
field_list = [name for (name, value) in self._attr_info.iteritems()
if ('required_by_policy' in value and
value['required_by_policy'] or
not 'default' in value)]
orig_obj = self._item(request, id, field_list=field_list)
orig_obj.update(body)
# Check authz
orig_obj = self._item(request, id)[self._resource]
try: try:
policy.enforce(request.context, action, orig_obj) policy.enforce(request.context,
action,
orig_obj,
plugin=self._plugin)
except exceptions.PolicyNotAuthorized: except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it # To avoid giving away information, pretend that it
# doesn't exist # doesn't exist
@ -319,7 +343,7 @@ class Controller(object):
if (('tenant_id' in res_dict and if (('tenant_id' in res_dict and
res_dict['tenant_id'] != context.tenant_id and res_dict['tenant_id'] != context.tenant_id and
not context.is_admin)): not context.is_admin)):
msg = _("Specifying 'tenant_id' other than authenticated" msg = _("Specifying 'tenant_id' other than authenticated "
"tenant in request requires admin privileges") "tenant in request requires admin privileges")
raise webob.exc.HTTPBadRequest(msg) raise webob.exc.HTTPBadRequest(msg)
@ -345,7 +369,6 @@ class Controller(object):
raise webob.exc.HTTPBadRequest(_("Resource body required")) raise webob.exc.HTTPBadRequest(_("Resource body required"))
body = body or {self._resource: {}} body = body or {self._resource: {}}
if self._collection in body and allow_bulk: if self._collection in body and allow_bulk:
bulk_body = [self._prepare_request_body(context, bulk_body = [self._prepare_request_body(context,
{self._resource: b}, {self._resource: b},
@ -411,17 +434,21 @@ class Controller(object):
msg = _("Invalid input for %(attr)s. " msg = _("Invalid input for %(attr)s. "
"Reason: %(reason)s.") % msg_dict "Reason: %(reason)s.") % msg_dict
raise webob.exc.HTTPUnprocessableEntity(msg) raise webob.exc.HTTPUnprocessableEntity(msg)
return body return body
def _validate_network_tenant_ownership(self, request, resource_item): def _validate_network_tenant_ownership(self, request, resource_item):
# TODO(salvatore-orlando): consider whether this check can be folded
# in the policy engine
if self._resource not in ('port', 'subnet'): if self._resource not in ('port', 'subnet'):
return return
network = self._plugin.get_network(
network_owner = self._plugin.get_network(
request.context, request.context,
resource_item['network_id'], resource_item['network_id'])
)['tenant_id'] # do not perform the check on shared networks
if network.get('shared'):
return
network_owner = network['tenant_id']
if network_owner != resource_item['tenant_id']: if network_owner != resource_item['tenant_id']:
msg = _("Tenant %(tenant_id)s not allowed to " msg = _("Tenant %(tenant_id)s not allowed to "

View File

@ -195,3 +195,8 @@ class OverQuota(QuantumException):
class InvalidQuotaValue(QuantumException): class InvalidQuotaValue(QuantumException):
message = _("Change would make usage less than 0 for the following " message = _("Change would make usage less than 0 for the following "
"resources: %(unders)s") "resources: %(unders)s")
class InvalidSharedSetting(QuantumException):
message = _("Unable to reconfigure sharing settings for network"
"%(network). Multiple tenants are using it")

View File

@ -65,9 +65,13 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
query = context.session.query(model) query = context.session.query(model)
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id # NOTE(jkoelker) non-admin queries are scoped to their tenant_id
# NOTE(salvatore-orlando): unless the model allows for shared objects
if not context.is_admin and hasattr(model, 'tenant_id'): if not context.is_admin and hasattr(model, 'tenant_id'):
if hasattr(model, 'shared'):
query = query.filter((model.tenant_id == context.tenant_id) |
(model.shared))
else:
query = query.filter(model.tenant_id == context.tenant_id) query = query.filter(model.tenant_id == context.tenant_id)
return query return query
def _get_by_id(self, context, model, id, joins=(), verbose=None): def _get_by_id(self, context, model, id, joins=(), verbose=None):
@ -610,12 +614,32 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
subnet['cidr']) subnet['cidr'])
return pools return pools
def _validate_shared_update(self, context, id, original, updated):
# The only case that needs to be validated is when 'shared'
# goes from True to False
if updated['shared'] == original.shared or updated['shared']:
return
ports = self._model_query(
context, models_v2.Port).filter(
models_v2.Port.network_id == id).all()
subnets = self._model_query(
context, models_v2.Subnet).filter(
models_v2.Subnet.network_id == id).all()
tenant_ids = set([port['tenant_id'] for port in ports] +
[subnet['tenant_id'] for subnet in subnets])
# raise if multiple tenants found or if the only tenant found
# is not the owner of the network
if (len(tenant_ids) > 1 or len(tenant_ids) == 1 and
tenant_ids.pop() != original.tenant_id):
raise q_exc.InvalidSharedSetting(network=original.name)
def _make_network_dict(self, network, fields=None): def _make_network_dict(self, network, fields=None):
res = {'id': network['id'], res = {'id': network['id'],
'name': network['name'], 'name': network['name'],
'tenant_id': network['tenant_id'], 'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'], 'admin_state_up': network['admin_state_up'],
'status': network['status'], 'status': network['status'],
'shared': network['shared'],
'subnets': [subnet['id'] 'subnets': [subnet['id']
for subnet in network['subnets']]} for subnet in network['subnets']]}
@ -659,6 +683,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
id=n.get('id') or utils.str_uuid(), id=n.get('id') or utils.str_uuid(),
name=n['name'], name=n['name'],
admin_state_up=n['admin_state_up'], admin_state_up=n['admin_state_up'],
shared=n['shared'],
status="ACTIVE") status="ACTIVE")
context.session.add(network) context.session.add(network)
return self._make_network_dict(network) return self._make_network_dict(network)
@ -667,6 +692,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
n = network['network'] n = network['network']
with context.session.begin(): with context.session.begin():
network = self._get_network(context, id) network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
self._validate_shared_update(context, id, network, n)
network.update(n) network.update(n)
return self._make_network_dict(network) return self._make_network_dict(network)

View File

@ -126,3 +126,4 @@ class Network(model_base.BASEV2, HasId, HasTenant):
subnets = orm.relationship(Subnet, backref='networks') subnets = orm.relationship(Subnet, backref='networks')
status = sa.Column(sa.String(16)) status = sa.Column(sa.String(16))
admin_state_up = sa.Column(sa.Boolean) admin_state_up = sa.Column(sa.Boolean)
shared = sa.Column(sa.Boolean)

View File

@ -18,9 +18,7 @@
""" """
Policy engine for quantum. Largely copied from nova. Policy engine for quantum. Largely copied from nova.
""" """
from quantum.api.v2 import attributes
import os.path
from quantum.common import exceptions from quantum.common import exceptions
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
import quantum.common.utils as utils import quantum.common.utils as utils
@ -56,41 +54,139 @@ def _set_brain(data):
policy.set_brain(policy.Brain.load_json(data, default_rule)) policy.set_brain(policy.Brain.load_json(data, default_rule))
def check(context, action, target): def _get_resource_and_action(action):
data = action.split(':', 1)[0].split('_', 1)
return ("%ss" % data[-1], data[0] != 'get')
def _is_attribute_explicitly_set(attribute_name, resource, target):
"""Verify that an attribute is present and has a non-default value"""
if ('default' in resource[attribute_name] and
target.get(attribute_name, attributes.ATTR_NOT_SPECIFIED) !=
attributes.ATTR_NOT_SPECIFIED):
if (target[attribute_name] != resource[attribute_name]['default']):
return True
return False
def _build_target(action, original_target, plugin, context):
"""Augment dictionary of target attributes for policy engine.
This routine adds to the dictionary attributes belonging to the
"parent" resource of the targeted one.
"""
target = original_target.copy()
resource, _w = _get_resource_and_action(action)
hierarchy_info = attributes.RESOURCE_HIERARCHY_MAP.get(resource, None)
if hierarchy_info and plugin:
# use the 'singular' version of the resource name
parent_resource = hierarchy_info['parent'][:-1]
parent_id = hierarchy_info['identified_by']
f = getattr(plugin, 'get_%s' % parent_resource)
# f *must* exist, if not found it is better to let quantum explode
# Note: we do not use admin context
data = f(context, target[parent_id], fields=['tenant_id'])
target['%s_tenant_id' % parent_resource] = data['tenant_id']
return target
def _create_access_rule_match(resource, is_write, shared):
if shared == resource[attributes.SHARED]:
return ('rule:%s:%s:%s' % (resource,
shared and 'shared' or 'private',
is_write and 'write' or 'read'), )
def _build_perm_match(action, target):
"""Create the permission rule match.
Given the current access right on a network (shared/private), and
the type of the current operation (read/write), builds a match
rule of the type <resource>:<sharing_mode>:<operation_type>
"""
resource, is_write = _get_resource_and_action(action)
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
if (resource in res_map and
attributes.SHARED in res_map[resource] and
attributes.SHARED in target):
return ('rule:%s:%s:%s' % (resource,
target[attributes.SHARED]
and 'shared' or 'private',
is_write and 'write' or 'read'), )
def _build_match_list(action, target):
"""Create the list of rules to match for a given action.
The list of policy rules to be matched is built in the following way:
1) add entries for matching permission on objects
2) add an entry for the specific action (e.g.: create_network)
3) add an entry for attributes of a resource for which the action
is being executed (e.g.: create_network:shared)
"""
match_list = ('rule:%s' % action,)
resource, is_write = _get_resource_and_action(action)
# assigning to variable with short name for improving readability
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
if resource in res_map:
for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name,
res_map[resource],
target):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute and is_write:
match_list += ('rule:%s:%s' % (action,
attribute_name),)
# add permission-based rule (for shared resources)
perm_match = _build_perm_match(action, target)
if perm_match:
match_list += perm_match
# the policy engine must AND between all the rules
return [match_list]
def check(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context. """Verifies that the action is valid on the target in this context.
:param context: quantum context :param context: quantum context
:param action: string representing the action to be checked :param action: string representing the action to be checked
this should be colon separated for clarity. this should be colon separated for clarity.
:param object: dictionary representing the object of the action :param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}`` location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: quantum plugin used to retrieve information required
for augmenting the target
:return: Returns True if access is permitted else False. :return: Returns True if access is permitted else False.
""" """
init() init()
match_list = ('rule:%s' % action,) real_target = _build_target(action, target, plugin, context)
match_list = _build_match_list(action, real_target)
credentials = context.to_dict() credentials = context.to_dict()
return policy.enforce(match_list, target, credentials) return policy.enforce(match_list, real_target, credentials)
def enforce(context, action, target): def enforce(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context. """Verifies that the action is valid on the target in this context.
:param context: quantum context :param context: quantum context
:param action: string representing the action to be checked :param action: string representing the action to be checked
this should be colon separated for clarity. this should be colon separated for clarity.
:param object: dictionary representing the object of the action :param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}`` location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: quantum plugin used to retrieve information required
for augmenting the target
:raises quantum.exceptions.PolicyNotAllowed: if verification fails. :raises quantum.exceptions.PolicyNotAllowed: if verification fails.
""" """
init() init()
match_list = ('rule:%s' % action,) real_target = _build_target(action, target, plugin, context)
match_list = _build_match_list(action, real_target)
credentials = context.to_dict() credentials = context.to_dict()
policy.enforce(match_list, real_target, credentials,
policy.enforce(match_list, target, credentials,
exceptions.PolicyNotAuthorized, action=action) exceptions.PolicyNotAuthorized, action=action)

View File

@ -233,37 +233,45 @@ class APIv2TestCase(APIv2TestBase):
fields=mock.ANY, fields=mock.ANY,
verbose=True) verbose=True)
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
if info.get('required_by_policy')]
fields = base_fields
fields.extend(policy_attrs)
return fields
def test_fields(self): def test_fields(self):
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_networks.return_value = [] instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': 'foo'}) self.api.get(_get_path('networks'), {'fields': 'foo'})
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY, instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY, filters=mock.ANY,
fields=['foo', fields=fields,
'tenant_id'],
verbose=mock.ANY) verbose=mock.ANY)
def test_fields_multiple(self): def test_fields_multiple(self):
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_networks.return_value = [] instance.get_networks.return_value = []
fields = self._do_field_list('networks', ['foo', 'bar'])
self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']}) self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']})
instance.get_networks.assert_called_once_with(mock.ANY, instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY, filters=mock.ANY,
fields=['foo', 'bar', fields=fields,
'tenant_id'],
verbose=mock.ANY) verbose=mock.ANY)
def test_fields_multiple_with_empty(self): def test_fields_multiple_with_empty(self):
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_networks.return_value = [] instance.get_networks.return_value = []
fields = self._do_field_list('networks', ['foo'])
self.api.get(_get_path('networks'), {'fields': ['foo', '']}) self.api.get(_get_path('networks'), {'fields': ['foo', '']})
instance.get_networks.assert_called_once_with(mock.ANY, instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY, filters=mock.ANY,
fields=['foo', fields=fields,
'tenant_id'],
verbose=mock.ANY) verbose=mock.ANY)
def test_fields_empty(self): def test_fields_empty(self):
@ -359,10 +367,10 @@ class APIv2TestCase(APIv2TestBase):
self.api.get(_get_path('networks'), {'foo': 'bar', 'fields': 'foo'}) self.api.get(_get_path('networks'), {'foo': 'bar', 'fields': 'foo'})
filters = {'foo': ['bar']} filters = {'foo': ['bar']}
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY, instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters, filters=filters,
fields=['foo', fields=fields,
'tenant_id'],
verbose=mock.ANY) verbose=mock.ANY)
def test_filters_with_verbose(self): def test_filters_with_verbose(self):
@ -385,10 +393,10 @@ class APIv2TestCase(APIv2TestBase):
'fields': 'foo', 'fields': 'foo',
'verbose': 'true'}) 'verbose': 'true'})
filters = {'foo': ['bar']} filters = {'foo': ['bar']}
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY, instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters, filters=filters,
fields=['foo', fields=fields,
'tenant_id'],
verbose=True) verbose=True)
@ -405,6 +413,7 @@ class JSONV2TestCase(APIv2TestBase):
'admin_state_up': True, 'admin_state_up': True,
'status': "ACTIVE", 'status': "ACTIVE",
'tenant_id': real_tenant_id, 'tenant_id': real_tenant_id,
'shared': False,
'subnets': []} 'subnets': []}
return_value = [input_dict] return_value = [input_dict]
instance = self.plugin.return_value instance = self.plugin.return_value
@ -416,6 +425,7 @@ class JSONV2TestCase(APIv2TestBase):
# expect full list returned # expect full list returned
self.assertEqual(len(res.json['networks']), 1) self.assertEqual(len(res.json['networks']), 1)
output_dict = res.json['networks'][0] output_dict = res.json['networks'][0]
input_dict['shared'] = False
self.assertEqual(len(input_dict), len(output_dict)) self.assertEqual(len(input_dict), len(output_dict))
for k, v in input_dict.iteritems(): for k, v in input_dict.iteritems():
self.assertEqual(v, output_dict[k]) self.assertEqual(v, output_dict[k])
@ -456,7 +466,9 @@ class JSONV2TestCase(APIv2TestBase):
def test_create_use_defaults(self): def test_create_use_defaults(self):
net_id = _uuid() net_id = _uuid()
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}} initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
full_input = {'network': {'admin_state_up': True, 'subnets': []}} full_input = {'network': {'admin_state_up': True,
'shared': False,
'subnets': []}}
full_input['network'].update(initial_input['network']) full_input['network'].update(initial_input['network'])
return_value = {'id': net_id, 'status': "ACTIVE"} return_value = {'id': net_id, 'status': "ACTIVE"}
@ -489,7 +501,7 @@ class JSONV2TestCase(APIv2TestBase):
# tenant_id should be fetched from env # tenant_id should be fetched from env
initial_input = {'network': {'name': 'net1'}} initial_input = {'network': {'name': 'net1'}}
full_input = {'network': {'admin_state_up': True, 'subnets': [], full_input = {'network': {'admin_state_up': True, 'subnets': [],
'tenant_id': tenant_id}} 'shared': False, 'tenant_id': tenant_id}}
full_input['network'].update(initial_input['network']) full_input['network'].update(initial_input['network'])
return_value = {'id': net_id, 'status': "ACTIVE"} return_value = {'id': net_id, 'status': "ACTIVE"}
@ -643,7 +655,8 @@ class JSONV2TestCase(APIv2TestBase):
if req_tenant_id: if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)} env = {'quantum.context': context.Context('', req_tenant_id)}
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': real_tenant_id} instance.get_network.return_value = {'tenant_id': real_tenant_id,
'shared': False}
instance.delete_network.return_value = None instance.delete_network.return_value = None
res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())), res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())),
@ -665,9 +678,14 @@ class JSONV2TestCase(APIv2TestBase):
def _test_get(self, req_tenant_id, real_tenant_id, expected_code, def _test_get(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False): expect_errors=False):
env = {} env = {}
shared = False
if req_tenant_id: if req_tenant_id:
env = {'quantum.context': context.Context('', req_tenant_id)} env = {'quantum.context': context.Context('', req_tenant_id)}
data = {'tenant_id': real_tenant_id} if req_tenant_id.endswith('another'):
shared = True
env['quantum.context'].roles = ['tenant_admin']
data = {'tenant_id': real_tenant_id, 'shared': shared}
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_network.return_value = data instance.get_network.return_value = data
@ -689,6 +707,10 @@ class JSONV2TestCase(APIv2TestBase):
self._test_get(tenant_id + "bad", tenant_id, self._test_get(tenant_id + "bad", tenant_id,
exc.HTTPNotFound.code, expect_errors=True) exc.HTTPNotFound.code, expect_errors=True)
def test_get_keystone_shared_network(self):
tenant_id = _uuid()
self._test_get(tenant_id + "another", tenant_id, 200)
def _test_update(self, req_tenant_id, real_tenant_id, expected_code, def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False): expect_errors=False):
env = {} env = {}
@ -700,7 +722,8 @@ class JSONV2TestCase(APIv2TestBase):
return_value.update(data['network'].copy()) return_value.update(data['network'].copy())
instance = self.plugin.return_value instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': real_tenant_id} instance.get_network.return_value = {'tenant_id': real_tenant_id,
'shared': False}
instance.update_network.return_value = return_value instance.update_network.return_value = return_value
res = self.api.put_json(_get_path('networks', res = self.api.put_json(_get_path('networks',
@ -887,9 +910,12 @@ class ExtensionTestCase(unittest.TestCase):
def test_extended_create(self): def test_extended_create(self):
net_id = _uuid() net_id = _uuid()
data = {'network': {'name': 'net1', 'admin_state_up': True, initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid(),
'tenant_id': _uuid(), 'subnets': [],
'v2attrs:something_else': "abc"}} 'v2attrs:something_else': "abc"}}
data = {'network': {'admin_state_up': True, 'subnets': [],
'shared': False}}
data['network'].update(initial_input['network'])
return_value = {'subnets': [], 'status': "ACTIVE", return_value = {'subnets': [], 'status': "ACTIVE",
'id': net_id, 'id': net_id,
'v2attrs:something': "123"} 'v2attrs:something': "123"}
@ -898,7 +924,7 @@ class ExtensionTestCase(unittest.TestCase):
instance = self.plugin.return_value instance = self.plugin.return_value
instance.create_network.return_value = return_value instance.create_network.return_value = return_value
res = self.api.post_json(_get_path('networks'), data) res = self.api.post_json(_get_path('networks'), initial_input)
instance.create_network.assert_called_with(mock.ANY, instance.create_network.assert_called_with(mock.ANY,
network=data) network=data)

View File

@ -122,56 +122,87 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
data = {'network': {'name': name, data = {'network': {'name': name,
'admin_state_up': admin_status_up, 'admin_state_up': admin_status_up,
'tenant_id': self._tenant_id}} 'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'tenant_id', 'public'): for arg in ('admin_state_up', 'tenant_id', 'shared'):
# Arg must be present and not empty # Arg must be present and not empty
if arg in kwargs and kwargs[arg]: if arg in kwargs and kwargs[arg]:
data['network'][arg] = kwargs[arg] data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt) network_req = self.new_create_request('networks', data, fmt)
if ('set_context' in kwargs and if (kwargs.get('set_context') and 'tenant_id' in kwargs):
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request # create a specific auth context for this request
network_req.environ['quantum.context'] = context.Context( network_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id']) '', kwargs['tenant_id'])
return network_req.get_response(self.api) return network_req.get_response(self.api)
def _create_subnet(self, fmt, tenant_id, net_id, gateway_ip, cidr, def _create_subnet(self, fmt, net_id, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True):
data = {'subnet': {'tenant_id': tenant_id,
'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version,
'tenant_id': self._tenant_id,
'enable_dhcp': enable_dhcp}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
if allocation_pools:
data['subnet']['allocation_pools'] = allocation_pools
subnet_req = self.new_create_request('subnets', data, fmt)
return subnet_req.get_response(self.api)
def _create_port(self, fmt, net_id, custom_req_body=None,
expected_res_status=None, **kwargs): expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': 4,
'tenant_id': self._tenant_id}}
for arg in ('gateway_ip', 'allocation_pools',
'ip_version', 'tenant_id',
'enable_dhcp'):
# Arg must be present and not null (but can be false)
if arg in kwargs and kwargs[arg] is not None:
data['subnet'][arg] = kwargs[arg]
subnet_req = self.new_create_request('subnets', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
subnet_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res
def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
content_type = 'application/' + fmt content_type = 'application/' + fmt
data = {'port': {'network_id': net_id, data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}} 'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'device_id', 'mac_address', for arg in ('admin_state_up', 'device_id',
'name', 'fixed_ips'): 'mac_address', 'fixed_ips',
'name', 'tenant_id'):
# Arg must be present and not empty # Arg must be present and not empty
if arg in kwargs and kwargs[arg]: if arg in kwargs and kwargs[arg]:
data['port'][arg] = kwargs[arg] data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt) port_req = self.new_create_request('ports', data, fmt)
return port_req.get_response(self.api) if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _list_ports(self, fmt, expected_res_status=None,
net_id=None, **kwargs):
query_params = None
if net_id:
query_params = "network_id=%s" % net_id
port_req = self.new_list_request('ports', fmt, query_params)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _make_subnet(self, fmt, network, gateway, cidr, def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True): allocation_pools=None, ip_version=4, enable_dhcp=True):
res = self._create_subnet(fmt, res = self._create_subnet(fmt,
network['network']['tenant_id'], net_id=network['network']['id'],
network['network']['id'], cidr=cidr,
gateway, gateway_ip=gateway,
cidr, tenant_id=network['network']['tenant_id'],
allocation_pools=allocation_pools, allocation_pools=allocation_pools,
ip_version=ip_version, ip_version=ip_version,
enable_dhcp=enable_dhcp) enable_dhcp=enable_dhcp)
@ -190,9 +221,21 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
req.get_response(self.api) req.get_response(self.api)
@contextlib.contextmanager @contextlib.contextmanager
def network(self, name='net1', admin_status_up=True, fmt='json'): def network(self, name='net1',
res = self._create_network(fmt, name, admin_status_up) admin_status_up=True,
fmt='json',
**kwargs):
res = self._create_network(fmt,
name,
admin_status_up,
**kwargs)
network = self.deserialize(fmt, res) network = self.deserialize(fmt, res)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield network yield network
self._delete('networks', network['network']['id']) self._delete('networks', network['network']['id'])
@ -373,6 +416,19 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
res = port_req.get_response(self.api) res = port_req.get_response(self.api)
self.assertEquals(res.status_int, 403) self.assertEquals(res.status_int, 403)
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', 'ACTIVE')]
with self.network(shared=True) as network:
port_res = self._create_port('json',
network['network']['id'],
201,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize('json', port_res)
for k, v in keys:
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
def test_list_ports(self): def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2): with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json') req = self.new_list_request('ports', 'json')
@ -382,6 +438,41 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertTrue(port1['port']['id'] in ids) self.assertTrue(port1['port']['id'] in ids)
self.assertTrue(port2['port']['id'] in ids) self.assertTrue(port2['port']['id'] in ids)
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
portres_1 = self._create_port('json',
network['network']['id'],
201,
tenant_id='tenant_1',
set_context=True)
portres_2 = self._create_port('json',
network['network']['id'],
201,
tenant_id='tenant_2',
set_context=True)
port1 = self.deserialize('json', portres_1)
port2 = self.deserialize('json', portres_2)
def _list_and_test_ports(expected_len, ports, tenant_id=None):
set_context = tenant_id is not None
port_res = self._list_ports('json',
200,
network['network']['id'],
tenant_id=tenant_id,
set_context=set_context)
port_list = self.deserialize('json', port_res)
self.assertEqual(len(port_list['ports']), expected_len)
ids = [p['id'] for p in port_list['ports']]
for port in ports:
self.assertIn(port['port']['id'], ids)
# Admin request - must return both ports
_list_and_test_ports(2, [port1, port2])
# Tenant_1 request - must return single port
_list_and_test_ports(1, [port1], tenant_id='tenant_1')
# Tenant_2 request - must return single port
_list_and_test_ports(1, [port2], tenant_id='tenant_2')
def test_show_port(self): def test_show_port(self):
with self.port() as port: with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], 'json') req = self.new_show_request('ports', port['port']['id'], 'json')
@ -396,6 +487,20 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
res = req.get_response(self.api) res = req.get_response(self.api)
self.assertEquals(res.status_int, 404) self.assertEquals(res.status_int, 404)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
port_res = self._create_port('json',
network['network']['id'],
201,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize('json', port_res)
port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
# Todo: verify!!!
def test_update_port(self): def test_update_port(self):
with self.port() as port: with self.port() as port:
data = {'port': {'admin_state_up': False}} data = {'port': {'admin_state_up': False}}
@ -615,9 +720,12 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
# Get a IPv4 and IPv6 address # Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id'] tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id'] net_id = subnet['subnet']['network_id']
res = self._create_subnet(fmt, tenant_id, net_id=net_id, res = self._create_subnet(fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::0/124', cidr='2607:f0d0:1002:51::0/124',
ip_version=6, gateway_ip=None) ip_version=6,
gateway_ip=None)
subnet2 = self.deserialize(fmt, res) subnet2 = self.deserialize(fmt, res)
kwargs = {"fixed_ips": kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']}, [{'subnet_id': subnet['subnet']['id']},
@ -834,11 +942,125 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
def test_create_network(self): def test_create_network(self):
name = 'net1' name = 'net1'
keys = [('subnets', []), ('name', name), ('admin_state_up', True), keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE')] ('status', 'ACTIVE'), ('shared', False)]
with self.network(name=name) as net: with self.network(name=name) as net:
for k, v in keys: for k, v in keys:
self.assertEquals(net['network'][k], v) self.assertEquals(net['network'][k], v)
def test_create_public_network(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with self.network(name=name, shared=True) as net:
for k, v in keys:
self.assertEquals(net['network'][k], v)
def test_create_public_network_no_admin_tenant(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
tenant_id="another_tenant",
set_context=True):
pass
self.assertEquals(ctx_manager.exception.code, 403)
def test_update_network(self):
with self.network() as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['network']['name'],
data['network']['name'])
def test_update_shared_network_noadmin_returns_403(self):
with self.network(shared=True) as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
req.environ['quantum.context'] = context.Context('', 'somebody')
res = req.get_response(self.api)
# The API layer always returns 404 on updates in place of 403
self.assertEqual(res.status_int, 404)
def test_update_network_set_shared(self):
with self.network(shared=False) as network:
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertTrue(res['network']['shared'])
def test_update_network_set_not_shared_single_tenant(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertFalse(res['network']['shared'])
def test_update_network_set_not_shared_other_tenant_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
self._create_port('json',
network['network']['id'],
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
with self.network(shared=True) as network:
self._create_port('json',
network['network']['id'],
201,
tenant_id='somebody_else',
set_context=True)
self._create_subnet('json',
network['network']['id'],
'10.0.0.0/24',
201,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
def test_list_networks(self): def test_list_networks(self):
with self.network(name='net1') as net1: with self.network(name='net1') as net1:
with self.network(name='net2') as net2: with self.network(name='net2') as net2:

View File

@ -29,6 +29,7 @@ import quantum
from quantum.common import exceptions from quantum.common import exceptions
from quantum.common import utils from quantum.common import utils
from quantum import context from quantum import context
from quantum.openstack.common import importutils
from quantum.openstack.common import policy as common_policy from quantum.openstack.common import policy as common_policy
from quantum import policy from quantum import policy
@ -206,3 +207,113 @@ class DefaultPolicyTestCase(unittest.TestCase):
self._set_brain("default_noexist") self._set_brain("default_noexist")
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce, self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {}) self.context, "example:noexist", {})
class QuantumPolicyTestCase(unittest.TestCase):
def setUp(self):
super(QuantumPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.rules = {
"admin_or_network_owner": [["role:admin"],
["tenant_id:%(network_tenant_id)s"]],
"admin_only": [["role:admin"]],
"regular_user": [["role:user"]],
"default": [],
"networks:private:read": [["rule:admin_only"]],
"networks:private:write": [["rule:admin_only"]],
"networks:shared:read": [["rule:regular_user"]],
"networks:shared:write": [["rule:admin_only"]],
"create_network": [],
"create_network:shared": [["rule:admin_only"]],
"update_network": [],
"update_network:shared": [["rule:admin_only"]],
"get_network": [],
"create_port:mac": [["rule:admin_or_network_owner"]],
}
def fakepolicyinit():
common_policy.set_brain(common_policy.Brain(self.rules))
self.patcher = mock.patch.object(quantum.policy,
'init',
new=fakepolicyinit)
self.patcher.start()
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
self.plugin = plugin_klass()
def tearDown(self):
self.patcher.stop()
def test_nonadmin_write_on_private_returns_403(self):
action = "update_network"
user_context = context.Context('', "user", roles=['user'])
# 384 is the int value of the bitmask for rw------
target = {'tenant_id': 'the_owner', 'shared': False}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
user_context, action, target, None)
def test_nonadmin_read_on_private_returns_403(self):
action = "get_network"
user_context = context.Context('', "user", roles=['user'])
# 384 is the int value of the bitmask for rw------
target = {'tenant_id': 'the_owner', 'shared': False}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
user_context, action, target, None)
def test_nonadmin_write_on_shared_returns_403(self):
action = "update_network"
user_context = context.Context('', "user", roles=['user'])
# 384 is the int value of the bitmask for rw-r--r--
target = {'tenant_id': 'the_owner', 'shared': True}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
user_context, action, target, None)
def test_nonadmin_read_on_shared_returns_200(self):
action = "get_network"
user_context = context.Context('', "user", roles=['user'])
# 420 is the int value of the bitmask for rw-r--r--
target = {'tenant_id': 'the_owner', 'shared': True}
result = policy.enforce(user_context, action, target, None)
self.assertEqual(result, None)
def _test_enforce_adminonly_attribute(self, action):
admin_context = context.get_admin_context()
target = {'shared': True}
result = policy.enforce(admin_context, action, target, None)
self.assertEqual(result, None)
def test_enforce_adminonly_attribute_create(self):
self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network')
def test_enforce_adminoly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self):
action = "get_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
result = policy.enforce(self.context, action, target, None)
self.assertIsNone(result)
def test_enforce_parentresource_owner(self):
def fakegetnetwork(*args, **kwargs):
return {'tenant_id': 'fake'}
action = "create_port:mac"
with mock.patch.object(self.plugin, 'get_network', new=fakegetnetwork):
target = {'network_id': 'whatever'}
result = policy.enforce(self.context, action, target, self.plugin)
self.assertIsNone(result)

View File

@ -17,7 +17,8 @@ from quantum import wsgi
from quantum.wsgi import Serializer from quantum.wsgi import Serializer
def create_request(path, body, content_type, method='GET', query_string=None): def create_request(path, body, content_type, method='GET',
query_string=None, context=None):
if query_string: if query_string:
url = "%s?%s" % (path, query_string) url = "%s?%s" % (path, query_string)
else: else:
@ -27,4 +28,6 @@ def create_request(path, body, content_type, method='GET', query_string=None):
req.headers = {} req.headers = {}
req.headers['Accept'] = content_type req.headers['Accept'] = content_type
req.body = body req.body = body
if context:
req.environ['quantum.context'] = context
return req return req