Support private registry - API layer

Introduce API resource 'registry' which can be used
by users to CRUD private registry.

Partial-Bug: #1702830
Change-Id: Icce91518a637ef59837641642f7b7ac174681081
This commit is contained in:
Hongbin Lu 2019-01-04 02:59:26 +00:00
parent 3c99f090f3
commit c88568d05b
14 changed files with 843 additions and 7 deletions

View File

@ -31,6 +31,7 @@ from zun.api.controllers.v1 import images as image_controller
from zun.api.controllers.v1 import networks as network_controller
from zun.api.controllers.v1 import quota_classes as quota_classes_controller
from zun.api.controllers.v1 import quotas as quotas_controller
from zun.api.controllers.v1 import registries as registries_controller
from zun.api.controllers.v1 import zun_services
from zun.api.controllers import versions as ver
from zun.api import http_error
@ -75,7 +76,8 @@ class V1(controllers_base.APIBase):
'capsules',
'availability_zones',
'quotas',
'quota_classes'
'quota_classes',
'registries',
)
@staticmethod
@ -146,6 +148,10 @@ class V1(controllers_base.APIBase):
pecan.request.host_url,
'quota_classes', '',
bookmark=True)]
v1.registries = [link.make_link('self', pecan.request.host_url,
'registries', ''),
link.make_link('bookmark', pecan.request.host_url,
'registries', '', bookmark=True)]
return v1
@ -161,6 +167,7 @@ class Controller(controllers_base.Controller):
capsules = capsule_controller.CapsuleController()
quotas = quotas_controller.QuotaController()
quota_classes = quota_classes_controller.QuotaClassController()
registries = registries_controller.RegistryController()
@pecan.expose('json')
def get(self):

View File

@ -39,3 +39,7 @@ class Collection(base.APIBase):
return link.make_link('next', pecan.request.host_url,
resource_url, next_args)['href']
class Item(base.APIBase):
pass

View File

@ -0,0 +1,227 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import pecan
from zun.api.controllers import base
from zun.api.controllers import link
from zun.api.controllers.v1 import collection
from zun.api.controllers.v1.schemas import registries as schema
from zun.api.controllers.v1.views import registries_view as view
from zun.api import utils as api_utils
from zun.api import validation
from zun.common import exception
from zun.common.i18n import _
from zun.common.policies import registry as policies
from zun.common import policy
from zun.common import utils
import zun.conf
from zun import objects
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
RESOURCE_NAME = 'registry'
COLLECTION_NAME = 'registries'
def _get_registry(registry_ident):
registry = api_utils.get_resource('Registry', registry_ident)
if not registry:
raise exception.RegistryNotFound(registry=registry_ident)
return registry
def check_policy_on_registry(registry, action):
context = pecan.request.context
policy.enforce(context, action, registry, action=action)
class RegistryCollection(collection.Collection):
"""API representation of a collection of registries."""
fields = {
COLLECTION_NAME,
'next'
}
"""A list containing registries objects"""
def __init__(self, **kwargs):
super(RegistryCollection, self).__init__(**kwargs)
self._type = COLLECTION_NAME
@staticmethod
def convert_with_links(rpc_registries, limit, url=None,
**kwargs):
context = pecan.request.context
collection = RegistryCollection()
collection.registries = \
[view.format_registry(context, url, r.as_dict())
for r in rpc_registries]
collection.next = collection.get_next(limit, url=url, **kwargs)
return collection
class RegistryItem(collection.Item):
"""API representation of an registry."""
fields = {
RESOURCE_NAME,
}
@staticmethod
def render_response(rpc_registry):
context = pecan.request.context
url = pecan.request.host_url
item = RegistryItem()
item.registry = view.format_registry(context, url,
rpc_registry.as_dict())
return item
class RegistryController(base.Controller):
"""Controller for Registries."""
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
def get_all(self, **kwargs):
"""Retrieve a list of registries.
"""
context = pecan.request.context
policy_action = policies.REGISTRY % 'get_all'
policy.enforce(context, policy_action, action=policy_action)
return self._get_registries_collection(**kwargs)
def _get_registries_collection(self, **kwargs):
context = pecan.request.context
if utils.is_all_projects(kwargs):
policy_action = policies.REGISTRY % 'get_all_all_projects'
policy.enforce(context, policy_action, action=policy_action)
context.all_projects = True
kwargs.pop('all_projects', None)
limit = api_utils.validate_limit(kwargs.pop('limit', None))
sort_dir = api_utils.validate_sort_dir(kwargs.pop('sort_dir', 'asc'))
sort_key = kwargs.pop('sort_key', 'id')
resource_url = kwargs.pop('resource_url', None)
registry_allowed_filters = ['name', 'domain', 'username',
'project_id', 'user_id']
filters = {}
for filter_key in registry_allowed_filters:
if filter_key in kwargs:
policy_action = policies.REGISTRY % ('get_one:' + filter_key)
context.can(policy_action, might_not_exist=True)
filter_value = kwargs.pop(filter_key)
filters[filter_key] = filter_value
marker_obj = None
marker = kwargs.pop('marker', None)
if marker:
marker_obj = objects.Registry.get_by_uuid(context, marker)
if kwargs:
unknown_params = [str(k) for k in kwargs]
msg = _("Unknown parameters: %s") % ", ".join(unknown_params)
raise exception.InvalidValue(msg)
registries = objects.Registry.list(context,
limit,
marker_obj,
sort_key,
sort_dir,
filters=filters)
return RegistryCollection.convert_with_links(registries, limit,
url=resource_url,
sort_key=sort_key,
sort_dir=sort_dir)
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
def get_one(self, registry_ident, **kwargs):
"""Retrieve information about the given registry.
:param registry_ident: UUID or name of a registry.
"""
context = pecan.request.context
if context.is_admin:
context.all_projects = True
registry = _get_registry(registry_ident)
policy_action = policies.REGISTRY % 'get_one'
check_policy_on_registry(registry.as_dict(), policy_action)
return RegistryItem.render_response(registry)
@pecan.expose('json')
@api_utils.enforce_content_types(['application/json'])
@exception.wrap_pecan_controller_exception
@validation.validated(schema.registry_create)
def post(self, run=False, **registry_dict):
context = pecan.request.context
policy_action = policies.REGISTRY % 'create'
policy.enforce(context, policy_action, action=policy_action)
registry_dict = registry_dict.get(RESOURCE_NAME)
registry_dict['project_id'] = context.project_id
registry_dict['user_id'] = context.user_id
new_registry = objects.Registry(context, **registry_dict)
new_registry.create(context)
# Set the HTTP Location Header
pecan.response.location = link.build_url(COLLECTION_NAME,
new_registry.uuid)
pecan.response.status = 202
return RegistryItem.render_response(new_registry)
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@validation.validated(schema.registry_update)
def patch(self, registry_ident, **registry_dict):
"""Update an existing registry.
:param registry_ident: UUID or name of a registry.
:param registry_dict: a json document to apply to this registry.
"""
registry = _get_registry(registry_ident)
context = pecan.request.context
policy_action = policies.REGISTRY % 'update'
check_policy_on_registry(registry.as_dict(), policy_action)
registry_dict = registry_dict.get(RESOURCE_NAME)
if 'name' in registry_dict:
registry.name = registry_dict['name']
if 'domain' in registry_dict:
registry.domain = registry_dict['domain']
if 'username' in registry_dict:
registry.username = registry_dict['username']
if 'password' in registry_dict:
registry.password = registry_dict['password']
registry.save(context)
return RegistryItem.render_response(registry)
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
def delete(self, registry_ident, **kwargs):
"""Delete a registry.
:param registry_ident: UUID or Name of a registry.
:param force: If True, allow to force delete the registry.
"""
context = pecan.request.context
if context.is_admin:
context.all_projects = True
registry = _get_registry(registry_ident)
policy_action = policies.REGISTRY % 'delete'
check_policy_on_registry(registry.as_dict(), policy_action)
registry.destroy(context)
pecan.response.status = 204

View File

@ -550,3 +550,25 @@ network_name = {
'maxLength': 255,
'pattern': '[a-zA-Z0-9][a-zA-Z0-9_.-]'
}
registry_name = {
'type': ['string'],
'minLength': 2,
'maxLength': 255,
'pattern': '^[a-zA-Z0-9][a-zA-Z0-9_.-]+$'
}
registry_domain = {
'type': ['string'],
'minLength': 1,
}
registry_username = {
'type': ['string'],
'minLength': 1,
}
registry_password = {
'type': ['string'],
'minLength': 1,
}

View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zun.api.controllers.v1.schemas import parameter_types
_registry_properties = {
'name': parameter_types.registry_name,
'domain': parameter_types.registry_domain,
'username': parameter_types.registry_username,
'password': parameter_types.registry_password,
}
registry_create = {
'type': 'object',
'properties': {
'registry': {
'type': 'object',
'properties': _registry_properties,
'additionalProperties': False,
'required': ['domain'],
},
},
'required': ['registry'],
'additionalProperties': False,
}
_registry_update_properties = {
'name': parameter_types.registry_name,
'domain': parameter_types.registry_domain,
'username': parameter_types.registry_username,
'password': parameter_types.registry_password,
}
registry_update = {
'type': 'object',
'properties': {
'registry': {
'type': 'object',
'properties': _registry_update_properties,
'additionalProperties': False,
},
},
'required': ['registry'],
'additionalProperties': False,
}

View File

@ -0,0 +1,54 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from zun.api.controllers import link
from zun.common.policies import registry as policies
_basic_keys = (
'uuid',
'user_id',
'project_id',
'name',
'domain',
'username',
'password',
'links',
)
def format_registry(context, url, registry):
def transform(key, value):
if key not in _basic_keys:
return
# strip the key if it is not allowed by policy
policy_action = policies.REGISTRY % ('get_one:%s' % key)
if not context.can(policy_action, fatal=False, might_not_exist=True):
return
if key == 'uuid':
yield ('uuid', value)
if url:
yield ('links', [link.make_link(
'self', url, 'registries', value),
link.make_link(
'bookmark', url,
'registries', value,
bookmark=True)])
elif key == 'password':
yield ('password', '***')
else:
yield (key, value)
return dict(itertools.chain.from_iterable(
transform(k, v) for k, v in registry.items()))

View File

@ -62,10 +62,11 @@ REST_API_VERSION_HISTORY = """REST API Version History:
* 1.27 - Add support for deleting networks
* 1.28 - Add support cpuset
* 1.29 - Add enable_cpu_pinning to compute_node
* 1.30 - Introduce API resource for representing private registry
"""
BASE_VER = '1.1'
CURRENT_MAX_VER = '1.29'
CURRENT_MAX_VER = '1.30'
class Version(object):

View File

@ -230,3 +230,8 @@ user documentation.
----
Add a new attribute 'enable_cpu_pinning' to 'host' resource.
1.30
----
Introduce API endpoint for create/read/update/delete private registry.

View File

@ -22,6 +22,7 @@ from zun.common.policies import image
from zun.common.policies import network
from zun.common.policies import quota
from zun.common.policies import quota_class
from zun.common.policies import registry
from zun.common.policies import zun_service
@ -37,5 +38,6 @@ def list_rules():
container_action.list_rules(),
availability_zone.list_rules(),
quota.list_rules(),
quota_class.list_rules()
quota_class.list_rules(),
registry.list_rules(),
)

View File

@ -0,0 +1,91 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from zun.common.policies import base
REGISTRY = 'registry:%s'
rules = [
policy.DocumentedRuleDefault(
name=REGISTRY % 'create',
check_str=base.RULE_ADMIN_OR_OWNER,
description='Create a new registry.',
operations=[
{
'path': '/v1/registries',
'method': 'POST'
}
]
),
policy.DocumentedRuleDefault(
name=REGISTRY % 'delete',
check_str=base.RULE_ADMIN_OR_OWNER,
description='Delete a registry.',
operations=[
{
'path': '/v1/registries/{registry_ident}',
'method': 'DELETE'
}
]
),
policy.DocumentedRuleDefault(
name=REGISTRY % 'get_one',
check_str=base.RULE_ADMIN_OR_OWNER,
description='Retrieve the details of a specific registry.',
operations=[
{
'path': '/v1/registries/{registry_ident}',
'method': 'GET'
}
]
),
policy.DocumentedRuleDefault(
name=REGISTRY % 'get_all',
check_str=base.RULE_ADMIN_OR_OWNER,
description='Retrieve the details of all registries.',
operations=[
{
'path': '/v1/registries',
'method': 'GET'
}
]
),
policy.DocumentedRuleDefault(
name=REGISTRY % 'get_all_all_projects',
check_str=base.RULE_ADMIN_API,
description='Retrieve the details of all registries across projects.',
operations=[
{
'path': '/v1/registries',
'method': 'GET'
}
]
),
policy.DocumentedRuleDefault(
name=REGISTRY % 'update',
check_str=base.RULE_ADMIN_OR_OWNER,
description='Update a registry.',
operations=[
{
'path': '/v1/registries/{registry_ident}',
'method': 'PATCH'
}
]
),
]
def list_rules():
return rules

View File

@ -26,7 +26,7 @@ from zun.tests.unit.db import base
PATH_PREFIX = '/v1'
CURRENT_VERSION = "container 1.29"
CURRENT_VERSION = "container 1.30"
class FunctionalTest(base.DbTestCase):

View File

@ -28,7 +28,7 @@ class TestRootController(api_base.FunctionalTest):
'default_version':
{'id': 'v1',
'links': [{'href': 'http://localhost/v1/', 'rel': 'self'}],
'max_version': '1.29',
'max_version': '1.30',
'min_version': '1.1',
'status': 'CURRENT'},
'description': 'Zun is an OpenStack project which '
@ -37,7 +37,7 @@ class TestRootController(api_base.FunctionalTest):
'versions': [{'id': 'v1',
'links': [{'href': 'http://localhost/v1/',
'rel': 'self'}],
'max_version': '1.29',
'max_version': '1.30',
'min_version': '1.1',
'status': 'CURRENT'}]}
@ -88,7 +88,11 @@ class TestRootController(api_base.FunctionalTest):
'quota_classes': [{'href': 'http://localhost/v1/quota_classes/',
'rel': 'self'},
{'href': 'http://localhost/quota_classes/',
'rel': 'bookmark'}]
'rel': 'bookmark'}],
'registries': [{'href': 'http://localhost/v1/registries/',
'rel': 'self'},
{'href': 'http://localhost/registries/',
'rel': 'bookmark'}],
}
def make_app(self, paste_file):

View File

@ -0,0 +1,340 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mock import patch
from webtest.app import AppError
from oslo_utils import uuidutils
from zun import objects
from zun.tests.unit.api import base as api_base
from zun.tests.unit.db import utils
from zun.tests.unit.objects import utils as obj_utils
class TestRegistryController(api_base.FunctionalTest):
def test_create_registry(self):
params = ('{"registry": {"name": "MyRegistry", "domain": "test.io",'
'"username": "fake-user", "password": "fake-pass"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
self.assertEqual(1, len(response.json))
r = response.json['registry']
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertEqual('MyRegistry', r.get('name'))
self.assertEqual('test.io', r.get('domain'))
self.assertEqual('fake-user', r.get('username'))
self.assertEqual('***', r.get('password'))
def test_create_registry_domain_not_specified(self):
params = ('{"registry": {"name": "MyRegistry",'
'"username": "fake-user", "password": "fake-pass"}}')
with self.assertRaisesRegex(AppError,
"is a required property"):
self.post('/v1/registries/',
params=params,
content_type='application/json')
def test_create_registry_with_minimum_params(self):
params = ('{"registry": {"domain": "test.io"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
self.assertEqual(1, len(response.json))
r = response.json['registry']
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertIsNone(r.get('name'))
self.assertEqual('test.io', r.get('domain'))
self.assertIsNone(r.get('username'))
self.assertEqual('***', r.get('password'))
def test_create_registry_invalid_long_name(self):
# Long name
params = ('{"registry": {"name": "' + 'i' * 256 + '",'
'"domain": "test.io","username": "fake-user",'
'"password": "fake-pass"}}')
self.assertRaises(AppError, self.post, '/v1/registries/',
params=params, content_type='application/json')
def test_get_all_registries(self):
params = ('{"registry": {"name": "MyRegistry", "domain": "test.io",'
'"username": "fake-user", "password": "fake-pass"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
response = self.get('/v1/registries/')
self.assertEqual(200, response.status_int)
self.assertEqual(2, len(response.json))
r = response.json['registries'][0]
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertEqual('MyRegistry', r.get('name'))
self.assertEqual('test.io', r.get('domain'))
self.assertEqual('fake-user', r.get('username'))
self.assertEqual('***', r.get('password'))
@patch('zun.common.policy.enforce')
@patch('zun.objects.Registry.list')
def test_get_all_registries_all_projects(self, mock_registry_list,
mock_policy):
mock_policy.return_value = True
test_registry = utils.get_test_registry()
registries = [objects.Registry(self.context, **test_registry)]
mock_registry_list.return_value = registries
response = self.get('/v1/registries/?all_projects=1')
mock_registry_list.assert_called_once_with(mock.ANY,
1000, None, 'id', 'asc',
filters={})
context = mock_registry_list.call_args[0][0]
self.assertIs(True, context.all_projects)
self.assertEqual(200, response.status_int)
actual_registries = response.json['registries']
self.assertEqual(1, len(actual_registries))
self.assertEqual(test_registry['uuid'],
actual_registries[0].get('uuid'))
@patch('zun.objects.Registry.list')
def test_get_all_registries_with_pagination_marker(self,
mock_registry_list):
registry_list = []
for id_ in range(4):
test_registry = utils.create_test_registry(
id=id_, uuid=uuidutils.generate_uuid(),
name='registry' + str(id_), context=self.context)
registry_list.append(objects.Registry(self.context,
**test_registry))
mock_registry_list.return_value = registry_list[-1:]
response = self.get('/v1/registries/?limit=3&marker=%s'
% registry_list[2].uuid)
self.assertEqual(200, response.status_int)
actual_registries = response.json['registries']
self.assertEqual(1, len(actual_registries))
self.assertEqual(registry_list[-1].uuid,
actual_registries[0].get('uuid'))
@patch('zun.objects.Registry.list')
def test_get_all_registries_with_filter(self, mock_registry_list):
test_registry = utils.get_test_registry()
registries = [objects.Registry(self.context, **test_registry)]
mock_registry_list.return_value = registries
response = self.get('/v1/registries/?name=fake-name')
mock_registry_list.assert_called_once_with(
mock.ANY, 1000, None, 'id', 'asc', filters={'name': 'fake-name'})
self.assertEqual(200, response.status_int)
actual_registries = response.json['registries']
self.assertEqual(1, len(actual_registries))
self.assertEqual(test_registry['uuid'],
actual_registries[0].get('uuid'))
@patch('zun.objects.Registry.list')
def test_get_all_registries_with_unknown_parameter(
self, mock_registry_list):
test_registry = utils.get_test_registry()
registries = [objects.Registry(self.context, **test_registry)]
mock_registry_list.return_value = registries
response = self.get('/v1/registries/?unknown=fake-name',
expect_errors=True)
mock_registry_list.assert_not_called()
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertEqual("Unknown parameters: unknown",
response.json['errors'][0]['detail'])
def test_get_one(self):
params = ('{"registry": {"name": "MyRegistry", "domain": "test.io",'
'"username": "fake-user", "password": "fake-pass"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
# get by uuid
registry_uuid = response.json['registry']['uuid']
response = self.get('/v1/registries/%s/' % registry_uuid)
self.assertEqual(200, response.status_int)
self.assertEqual(1, len(response.json))
r = response.json['registry']
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertEqual('MyRegistry', r.get('name'))
self.assertEqual('test.io', r.get('domain'))
self.assertEqual('fake-user', r.get('username'))
self.assertEqual('***', r.get('password'))
# get by name
registry_name = response.json['registry']['name']
response = self.get('/v1/registries/%s/' % registry_name)
self.assertEqual(200, response.status_int)
self.assertEqual(1, len(response.json))
r = response.json['registry']
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertEqual('MyRegistry', r.get('name'))
self.assertEqual('test.io', r.get('domain'))
self.assertEqual('fake-user', r.get('username'))
self.assertEqual('***', r.get('password'))
def test_get_one_not_found(self):
response = self.get('/v1/registries/%s/' % 'not-exist',
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertEqual("Registry not-exist could not be found.",
response.json['errors'][0]['detail'])
def test_patch_by_uuid(self):
params = ('{"registry": {"name": "MyRegistry", "domain": "test.io",'
'"username": "fake-user", "password": "fake-pass"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
registry_uuid = response.json['registry']['uuid']
params = {'registry': {'name': 'new-name', 'domain': 'new-domain',
'username': 'new-username', 'password': 'new-pass'}}
response = self.patch_json(
'/registries/%s/' % registry_uuid,
params=params)
self.assertEqual(200, response.status_int)
self.assertEqual(1, len(response.json))
r = response.json['registry']
self.assertIsNotNone(r.get('uuid'))
self.assertIsNotNone(r.get('user_id'))
self.assertIsNotNone(r.get('project_id'))
self.assertEqual('new-name', r.get('name'))
self.assertEqual('new-domain', r.get('domain'))
self.assertEqual('new-username', r.get('username'))
self.assertEqual('***', r.get('password'))
def test_delete_registry_by_uuid(self):
params = ('{"registry": {"name": "MyRegistry", "domain": "test.io",'
'"username": "fake-user", "password": "fake-pass"}}')
response = self.post('/v1/registries/',
params=params,
content_type='application/json')
self.assertEqual(202, response.status_int)
registry_uuid = response.json['registry']['uuid']
response = self.delete('/v1/registries/%s/' % registry_uuid)
self.assertEqual(204, response.status_int)
response = self.get('/v1/registries/%s/' % registry_uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
class TestRegistryEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
rules = dict({rule: 'project_id:non_fake'},
**kwarg.pop('bypass_rules', {}))
self.policy.set_rules(rules)
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule,
response.json['errors'][0]['detail'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
'registry:get_all', self.get, '/v1/registries/',
expect_errors=True)
def test_policy_disallow_get_all_all_projects(self):
self._common_policy_check(
'registry:get_all_all_projects',
self.get, '/v1/registries/?all_projects=1',
expect_errors=True,
bypass_rules={'registry:get_all': 'project_id:fake_project'})
def test_policy_disallow_get_one(self):
registry = obj_utils.create_test_registry(self.context)
self._common_policy_check(
'registry:get_one', self.get,
'/v1/registries/%s/' % registry.uuid,
expect_errors=True)
def test_policy_disallow_update(self):
registry = obj_utils.create_test_registry(self.context)
params = {'registry': {'name': 'newname'}}
self._common_policy_check(
'registry:update', self.patch_json,
'/registries/%s/' % registry.uuid, params,
expect_errors=True)
def test_policy_disallow_create(self):
params = ('{"registry": {"domain": "test.io"}}')
self._common_policy_check(
'registry:create', self.post, '/v1/registries/',
params=params,
content_type='application/json',
expect_errors=True)
def test_policy_disallow_delete(self):
registry = obj_utils.create_test_registry(self.context)
self._common_policy_check(
'registry:delete', self.delete,
'/v1/registries/%s/' % registry.uuid,
expect_errors=True)
def _owner_check(self, rule, func, *args, **kwargs):
self.policy.set_rules({rule: "user_id:%(user_id)s"})
response = func(*args, **kwargs)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
response.json['errors'][0]['detail'])
def test_policy_only_owner_get_one(self):
registry = obj_utils.create_test_registry(self.context,
user_id='another')
self._owner_check("registry:get_one", self.get_json,
'/registries/%s/' % registry.uuid,
expect_errors=True)
def test_policy_only_owner_update(self):
registry = obj_utils.create_test_registry(self.context,
user_id='another')
self._owner_check(
"registry:update", self.patch_json,
'/registries/%s/' % registry.uuid,
{'registry': {'name': 'newname'}},
expect_errors=True)
def test_policy_only_owner_delete(self):
registry = obj_utils.create_test_registry(self.context,
user_id='another')
self._owner_check(
"registry:delete", self.delete,
'/registries/%s/' % registry.uuid,
expect_errors=True)

View File

@ -38,3 +38,27 @@ def get_test_container(context, **kwargs):
for key in db_container:
setattr(container, key, db_container[key])
return container
def create_test_registry(context, **kwargs):
"""Create and return a test registry object.
Create a registry in the DB and return a registry object with
appropriate attributes.
"""
registry = get_test_registry(context, **kwargs)
registry.create(context)
return registry
def get_test_registry(context, **kwargs):
"""Return a test registry object with appropriate attributes.
NOTE: The object leaves the attributes marked as changed, such
that a create() could be used to commit it to the DB.
"""
db_registry = db_utils.get_test_registry(**kwargs)
registry = objects.Registry(context)
for key in db_registry:
setattr(registry, key, db_registry[key])
return registry