Implement network rbac create and delete commands

Add "network rbac create" and "network rbac delete" commands
and also add unit tests, functional tests, docs and release
note for them.

Change-Id: I5fd58342f2deaa9bae7717412a942a21bbd7d045
Partially-Implements: blueprint neutron-client-rbac
This commit is contained in:
Huanxuan Ao 2016-07-25 17:37:41 +08:00
parent a8880e8b34
commit 13bc3793e0
8 changed files with 528 additions and 5 deletions

View File

@ -8,6 +8,68 @@ to network resources for specific projects.
Network v2 Network v2
network rbac create
-------------------
Create network RBAC policy
.. program:: network rbac create
.. code:: bash
os network rbac create
--type <type>
--action <action>
--target-project <target-project> [--target-project-domain <target-project-domain>]
[--project <project> [--project-domain <project-domain>]]
<rbac-policy>
.. option:: --type <type>
Type of the object that RBAC policy affects ("qos_policy" or "network") (required)
.. option:: --action <action>
Action for the RBAC policy ("access_as_external" or "access_as_shared") (required)
.. option:: --target-project <target-project>
The project to which the RBAC policy will be enforced (name or ID) (required)
.. option:: --target-project-domain <target-project-domain>
Domain the target project belongs to (name or ID).
This can be used in case collisions between project names exist.
.. option:: --project <project>
The owner project (name or ID)
.. option:: --project-domain <project-domain>
Domain the project belongs to (name or ID).
This can be used in case collisions between project names exist.
.. _network_rbac_create-rbac-policy:
.. describe:: <rbac-object>
The object to which this RBAC policy affects (name or ID for network objects, ID only for QoS policy objects)
network rbac delete
-------------------
Delete network RBAC policy(s)
.. program:: network rbac delete
.. code:: bash
os network rbac delete
<rbac-policy> [<rbac-policy> ...]
.. _network_rbac_delete-rbac-policy:
.. describe:: <rbac-policy>
RBAC policy(s) to delete (ID only)
network rbac list network rbac list
----------------- -----------------

View File

@ -38,7 +38,7 @@ Delete example(s)
.. describe:: <example> .. describe:: <example>
Example to delete (name or ID) Example(s) to delete (name or ID)
example list example list
------------ ------------

View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from functional.common import test
class NetworkRBACTests(test.TestCase):
"""Functional tests for network rbac. """
NET_NAME = uuid.uuid4().hex
OBJECT_ID = None
ID = None
HEADERS = ['ID']
FIELDS = ['id']
@classmethod
def setUpClass(cls):
opts = cls.get_opts(cls.FIELDS)
raw_output = cls.openstack('network create ' + cls.NET_NAME + opts)
cls.OBJECT_ID = raw_output.strip('\n')
opts = cls.get_opts(['id', 'object_id'])
raw_output = cls.openstack('network rbac create ' +
cls.OBJECT_ID +
' --action access_as_shared' +
' --target-project admin' +
' --type network' + opts)
cls.ID, object_id, rol = tuple(raw_output.split('\n'))
cls.assertOutput(cls.OBJECT_ID, object_id)
@classmethod
def tearDownClass(cls):
raw_output = cls.openstack('network rbac delete ' + cls.ID)
cls.assertOutput('', raw_output)
raw_output = cls.openstack('network delete ' + cls.OBJECT_ID)
cls.assertOutput('', raw_output)
def test_network_rbac_list(self):
opts = self.get_opts(self.HEADERS)
raw_output = self.openstack('network rbac list' + opts)
self.assertIn(self.ID, raw_output)
def test_network_rbac_show(self):
opts = self.get_opts(self.FIELDS)
raw_output = self.openstack('network rbac show ' + self.ID + opts)
self.assertEqual(self.ID + "\n", raw_output)

View File

@ -13,10 +13,17 @@
"""RBAC action implementations""" """RBAC action implementations"""
import logging
from osc_lib.command import command from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils from osc_lib import utils
from openstackclient.i18n import _ from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
LOG = logging.getLogger(__name__)
def _get_columns(item): def _get_columns(item):
@ -30,6 +37,131 @@ def _get_columns(item):
return tuple(sorted(columns)) return tuple(sorted(columns))
def _get_attrs(client_manager, parsed_args):
attrs = {}
attrs['object_type'] = parsed_args.type
attrs['action'] = parsed_args.action
network_client = client_manager.network
if parsed_args.type == 'network':
object_id = network_client.find_network(
parsed_args.rbac_object, ignore_missing=False).id
if parsed_args.type == 'qos_policy':
# TODO(Huanxuan Ao): Support finding a object ID by obejct name
# after qos policy finding supported in SDK.
object_id = parsed_args.rbac_object
attrs['object_id'] = object_id
identity_client = client_manager.identity
project_id = identity_common.find_project(
identity_client,
parsed_args.target_project,
parsed_args.target_project_domain,
).id
attrs['target_tenant'] = project_id
if parsed_args.project is not None:
project_id = identity_common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
return attrs
class CreateNetworkRBAC(command.ShowOne):
"""Create network RBAC policy"""
def get_parser(self, prog_name):
parser = super(CreateNetworkRBAC, self).get_parser(prog_name)
parser.add_argument(
'rbac_object',
metavar="<rbac-object>",
help=_("The object to which this RBAC policy affects (name or "
"ID for network objects, ID only for QoS policy objects)")
)
parser.add_argument(
'--type',
metavar="<type>",
required=True,
choices=['qos_policy', 'network'],
help=_('Type of the object that RBAC policy '
'affects ("qos_policy" or "network")')
)
parser.add_argument(
'--action',
metavar="<action>",
required=True,
choices=['access_as_external', 'access_as_shared'],
help=_('Action for the RBAC policy '
'("access_as_external" or "access_as_shared")')
)
parser.add_argument(
'--target-project',
required=True,
metavar="<target-project>",
help=_('The project to which the RBAC policy '
'will be enforced (name or ID)')
)
parser.add_argument(
'--target-project-domain',
metavar='<target-project-domain>',
help=_('Domain the target project belongs to (name or ID). '
'This can be used in case collisions between project names '
'exist.'),
)
parser.add_argument(
'--project',
metavar="<project>",
help=_('The owner project (name or ID)')
)
identity_common.add_project_domain_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = _get_attrs(self.app.client_manager, parsed_args)
obj = client.create_rbac_policy(**attrs)
columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return columns, data
class DeleteNetworkRBAC(command.Command):
"""Delete network RBAC policy(s)"""
def get_parser(self, prog_name):
parser = super(DeleteNetworkRBAC, self).get_parser(prog_name)
parser.add_argument(
'rbac_policy',
metavar="<rbac-policy>",
nargs='+',
help=_("RBAC policy(s) to delete (ID only)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
result = 0
for rbac in parsed_args.rbac_policy:
try:
obj = client.find_rbac_policy(rbac, ignore_missing=False)
client.delete_rbac_policy(obj)
except Exception as e:
result += 1
LOG.error(_("Failed to delete RBAC policy with "
"ID '%(rbac)s': %(e)s"),
{'rbac': rbac, 'e': e})
if result > 0:
total = len(parsed_args.rbac_policy)
msg = (_("%(result)s of %(total)s RBAC policies failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListNetworkRBAC(command.Lister): class ListNetworkRBAC(command.Lister):
"""List network RBAC policies""" """List network RBAC policies"""

View File

@ -541,6 +541,25 @@ class FakeNetworkRBAC(object):
return rbac_policies return rbac_policies
@staticmethod
def get_network_rbacs(rbac_policies=None, count=2):
"""Get an iterable MagicMock object with a list of faked rbac policies.
If rbac policies list is provided, then initialize the Mock object
with the list. Otherwise create one.
:param List rbac_policies:
A list of FakeResource objects faking rbac policies
:param int count:
The number of rbac policies to fake
:return:
An iterable Mock object with side_effect set to a list of faked
rbac policies
"""
if rbac_policies is None:
rbac_policies = FakeNetworkRBAC.create_network_rbacs(count)
return mock.MagicMock(side_effect=rbac_policies)
class FakeRouter(object): class FakeRouter(object):
"""Fake one or more routers.""" """Fake one or more routers."""

View File

@ -12,8 +12,12 @@
# #
import mock import mock
from mock import call
from osc_lib import exceptions
from openstackclient.network.v2 import network_rbac from openstackclient.network.v2 import network_rbac
from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
from openstackclient.tests.network.v2 import fakes as network_fakes from openstackclient.tests.network.v2 import fakes as network_fakes
from openstackclient.tests import utils as tests_utils from openstackclient.tests import utils as tests_utils
@ -25,6 +29,252 @@ class TestNetworkRBAC(network_fakes.TestNetworkV2):
# Get a shortcut to the network client # Get a shortcut to the network client
self.network = self.app.client_manager.network self.network = self.app.client_manager.network
# Get a shortcut to the ProjectManager Mock
self.projects_mock = self.app.client_manager.identity.projects
class TestCreateNetworkRBAC(TestNetworkRBAC):
network_object = network_fakes.FakeNetwork.create_one_network()
project = identity_fakes_v3.FakeProject.create_one_project()
rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
attrs={'tenant_id': project.id,
'target_tenant': project.id,
'object_id': network_object.id}
)
columns = (
'action',
'id',
'object_id',
'object_type',
'project_id',
'target_project',
)
data = [
rbac_policy.action,
rbac_policy.id,
rbac_policy.object_id,
rbac_policy.object_type,
rbac_policy.tenant_id,
rbac_policy.target_tenant,
]
def setUp(self):
super(TestCreateNetworkRBAC, self).setUp()
# Get the command object to test
self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace)
self.network.create_rbac_policy = mock.Mock(
return_value=self.rbac_policy)
self.network.find_network = mock.Mock(
return_value=self.network_object)
self.projects_mock.get.return_value = self.project
def test_network_rbac_create_no_type(self):
arglist = [
'--action', self.rbac_policy.action,
self.rbac_policy.object_id,
]
verifylist = [
('action', self.rbac_policy.action),
('rbac_policy', self.rbac_policy.id),
]
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_network_rbac_create_no_action(self):
arglist = [
'--type', self.rbac_policy.object_type,
self.rbac_policy.object_id,
]
verifylist = [
('type', self.rbac_policy.object_type),
('rbac_policy', self.rbac_policy.id),
]
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_network_rbac_create_invalid_type(self):
arglist = [
'--action', self.rbac_policy.action,
'--type', 'invalid_type',
'--target-project', self.rbac_policy.target_tenant,
self.rbac_policy.object_id,
]
verifylist = [
('action', self.rbac_policy.action),
('type', 'invalid_type'),
('target-project', self.rbac_policy.target_tenant),
('rbac_policy', self.rbac_policy.id),
]
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_network_rbac_create_invalid_action(self):
arglist = [
'--type', self.rbac_policy.object_type,
'--action', 'invalid_action',
'--target-project', self.rbac_policy.target_tenant,
self.rbac_policy.object_id,
]
verifylist = [
('type', self.rbac_policy.object_type),
('action', 'invalid_action'),
('target-project', self.rbac_policy.target_tenant),
('rbac_policy', self.rbac_policy.id),
]
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_network_rbac_create(self):
arglist = [
'--type', self.rbac_policy.object_type,
'--action', self.rbac_policy.action,
'--target-project', self.rbac_policy.target_tenant,
self.rbac_policy.object_id,
]
verifylist = [
('type', self.rbac_policy.object_type),
('action', self.rbac_policy.action),
('target_project', self.rbac_policy.target_tenant),
('rbac_object', self.rbac_policy.object_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.network.create_rbac_policy.assert_called_with(**{
'object_id': self.rbac_policy.object_id,
'object_type': self.rbac_policy.object_type,
'action': self.rbac_policy.action,
'target_tenant': self.rbac_policy.target_tenant,
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
def test_network_rbac_create_all_options(self):
arglist = [
'--type', self.rbac_policy.object_type,
'--action', self.rbac_policy.action,
'--target-project', self.rbac_policy.target_tenant,
'--project', self.rbac_policy.tenant_id,
'--project-domain', self.project.domain_id,
'--target-project-domain', self.project.domain_id,
self.rbac_policy.object_id,
]
verifylist = [
('type', self.rbac_policy.object_type),
('action', self.rbac_policy.action),
('target_project', self.rbac_policy.target_tenant),
('project', self.rbac_policy.tenant_id),
('project_domain', self.project.domain_id),
('target_project_domain', self.project.domain_id),
('rbac_object', self.rbac_policy.object_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.network.create_rbac_policy.assert_called_with(**{
'object_id': self.rbac_policy.object_id,
'object_type': self.rbac_policy.object_type,
'action': self.rbac_policy.action,
'target_tenant': self.rbac_policy.target_tenant,
'tenant_id': self.rbac_policy.tenant_id,
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
class TestDeleteNetworkRBAC(TestNetworkRBAC):
rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2)
def setUp(self):
super(TestDeleteNetworkRBAC, self).setUp()
self.network.delete_rbac_policy = mock.Mock(return_value=None)
self.network.find_rbac_policy = (
network_fakes.FakeNetworkRBAC.get_network_rbacs(
rbac_policies=self.rbac_policies)
)
# Get the command object to test
self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace)
def test_network_rbac_delete(self):
arglist = [
self.rbac_policies[0].id,
]
verifylist = [
('rbac_policy', [self.rbac_policies[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.find_rbac_policy.assert_called_once_with(
self.rbac_policies[0].id, ignore_missing=False)
self.network.delete_rbac_policy.assert_called_once_with(
self.rbac_policies[0])
self.assertIsNone(result)
def test_multi_network_rbacs_delete(self):
arglist = []
verifylist = []
for r in self.rbac_policies:
arglist.append(r.id)
verifylist = [
('rbac_policy', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for r in self.rbac_policies:
calls.append(call(r))
self.network.delete_rbac_policy.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_network_policies_delete_with_exception(self):
arglist = [
self.rbac_policies[0].id,
'unexist_rbac_policy',
]
verifylist = [
('rbac_policy',
[self.rbac_policies[0].id, 'unexist_rbac_policy']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [self.rbac_policies[0], exceptions.CommandError]
self.network.find_rbac_policy = (
mock.MagicMock(side_effect=find_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e))
self.network.find_rbac_policy.assert_any_call(
self.rbac_policies[0].id, ignore_missing=False)
self.network.find_rbac_policy.assert_any_call(
'unexist_rbac_policy', ignore_missing=False)
self.network.delete_rbac_policy.assert_called_once_with(
self.rbac_policies[0]
)
class TestListNetworkRABC(TestNetworkRBAC): class TestListNetworkRABC(TestNetworkRBAC):
@ -107,16 +357,18 @@ class TestShowNetworkRBAC(TestNetworkRBAC):
def test_network_rbac_show_all_options(self): def test_network_rbac_show_all_options(self):
arglist = [ arglist = [
self.rbac_policy.object_id, self.rbac_policy.id,
]
verifylist = [
('rbac_policy', self.rbac_policy.id),
] ]
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples # DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args) columns, data = self.cmd.take_action(parsed_args)
self.network.find_rbac_policy.assert_called_with( self.network.find_rbac_policy.assert_called_with(
self.rbac_policy.object_id, ignore_missing=False self.rbac_policy.id, ignore_missing=False
) )
self.assertEqual(self.columns, columns) self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data)) self.assertEqual(self.data, list(data))

View File

@ -1,4 +1,5 @@
--- ---
features: features:
- Add ``network rbac list`` and ``network rbac show`` commands. - Add ``network rbac list``, ``network rbac show``, ``network rbac create``
and ``network rbac delete`` commands.
[Blueprint `neutron-client-rbac <https://blueprints.launchpad.net/python-openstackclient/+spec/neutron-client-rbac>`_] [Blueprint `neutron-client-rbac <https://blueprints.launchpad.net/python-openstackclient/+spec/neutron-client-rbac>`_]

View File

@ -361,6 +361,8 @@ openstack.network.v2 =
network_set = openstackclient.network.v2.network:SetNetwork network_set = openstackclient.network.v2.network:SetNetwork
network_show = openstackclient.network.v2.network:ShowNetwork network_show = openstackclient.network.v2.network:ShowNetwork
network_rbac_create = openstackclient.network.v2.network_rbac:CreateNetworkRBAC
network_rbac_delete = openstackclient.network.v2.network_rbac:DeleteNetworkRBAC
network_rbac_list = openstackclient.network.v2.network_rbac:ListNetworkRBAC network_rbac_list = openstackclient.network.v2.network_rbac:ListNetworkRBAC
network_rbac_show = openstackclient.network.v2.network_rbac:ShowNetworkRBAC network_rbac_show = openstackclient.network.v2.network_rbac:ShowNetworkRBAC