identity: Migrate 'application credential' commands to SDK

Change-Id: Iba3fee2672d32266623c6f367beaabe84bd3d24e
This commit is contained in:
Antonia Gaete 2024-06-26 15:54:44 +00:00
parent dd6ac285d5
commit bef8a7a630
4 changed files with 344 additions and 276 deletions

View File

@ -18,6 +18,7 @@
import datetime
import json
import logging
import uuid
from osc_lib.command import command
from osc_lib import exceptions
@ -30,6 +31,30 @@ from openstackclient.identity import common
LOG = logging.getLogger(__name__)
# TODO(stephenfin): Move this to osc_lib since it's useful elsewhere
def is_uuid_like(value) -> bool:
"""Returns validation of a value as a UUID.
:param val: Value to verify
:type val: string
:returns: bool
.. versionchanged:: 1.1.1
Support non-lowercase UUIDs.
"""
try:
formatted_value = (
value.replace('urn:', '')
.replace('uuid:', '')
.strip('{}')
.replace('-', '')
.lower()
)
return str(uuid.UUID(value)).replace('-', '') == formatted_value
except (TypeError, ValueError, AttributeError):
return False
class CreateApplicationCredential(command.ShowOne):
_description = _("Create new application credential")
@ -105,19 +130,16 @@ class CreateApplicationCredential(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
role_ids = []
for role in parsed_args.role:
# A user can only create an application credential for themself,
# not for another user even as an admin, and only on the project to
# which they are currently scoped with a subset of the role
# assignments they have on that project. Don't bother trying to
# look up roles via keystone, just introspect the token.
role_id = common._get_token_resource(
identity_client, "roles", role
)
role_ids.append(role_id)
if is_uuid_like(role):
role_ids.append({'id': role})
else:
role_ids.append({'name': role})
expires_at = None
if parsed_args.expiration:
@ -144,10 +166,10 @@ class CreateApplicationCredential(command.ShowOne):
)
raise exceptions.CommandError(msg)
else:
access_rules = None
access_rules = []
app_cred_manager = identity_client.application_credentials
application_credential = app_cred_manager.create(
application_credential = identity_client.create_application_credential(
user_id,
parsed_args.name,
roles=role_ids,
expires_at=expires_at,
@ -157,14 +179,32 @@ class CreateApplicationCredential(command.ShowOne):
access_rules=access_rules,
)
application_credential._info.pop('links', None)
# Format roles into something sensible
roles = application_credential._info.pop('roles')
msg = ' '.join(r['name'] for r in roles)
application_credential._info['roles'] = msg
if application_credential['roles']:
roles = application_credential['roles']
msg = ' '.join(r['name'] for r in roles)
application_credential['roles'] = msg
return zip(*sorted(application_credential._info.items()))
columns = (
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
'Secret',
)
return (
columns,
(
utils.get_dict_properties(
application_credential,
columns,
)
),
)
class DeleteApplicationCredential(command.Command):
@ -181,15 +221,19 @@ class DeleteApplicationCredential(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
errors = 0
for ac in parsed_args.application_credential:
try:
app_cred = utils.find_resource(
identity_client.application_credentials, ac
app_cred = identity_client.find_application_credential(
user_id, ac
)
identity_client.delete_application_credential(
user_id, app_cred.id
)
identity_client.application_credentials.delete(app_cred.id)
except Exception as e:
errors += 1
LOG.error(
@ -223,16 +267,36 @@ class ListApplicationCredential(command.Lister):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
if parsed_args.user:
user_id = common.find_user(
identity_client, parsed_args.user, parsed_args.user_domain
).id
else:
user_id = None
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
columns = ('ID', 'Name', 'Project ID', 'Description', 'Expires At')
data = identity_client.application_credentials.list(user=user_id)
data = identity_client.application_credentials(user=user_id)
data_formatted = []
for ac in data:
# Format roles into something sensible
roles = ac['roles']
msg = ' '.join(r['name'] for r in roles)
ac['roles'] = msg
data_formatted.append(ac)
columns = (
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
)
return (
columns,
(
@ -241,7 +305,7 @@ class ListApplicationCredential(command.Lister):
columns,
formatters={},
)
for s in data
for s in data_formatted
),
)
@ -259,17 +323,35 @@ class ShowApplicationCredential(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
app_cred = utils.find_resource(
identity_client.application_credentials,
parsed_args.application_credential,
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
app_cred = identity_client.find_application_credential(
user_id, parsed_args.application_credential
)
app_cred._info.pop('links', None)
# Format roles into something sensible
roles = app_cred._info.pop('roles')
roles = app_cred['roles']
msg = ' '.join(r['name'] for r in roles)
app_cred._info['roles'] = msg
app_cred['roles'] = msg
return zip(*sorted(app_cred._info.items()))
columns = (
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
)
return (
columns,
(
utils.get_dict_properties(
app_cred,
columns,
)
),
)

View File

@ -21,13 +21,13 @@ from openstackclient.tests.functional.identity.v3 import common
class ApplicationCredentialTests(common.IdentityTests):
APPLICATION_CREDENTIAL_FIELDS = [
'id',
'name',
'project_id',
'description',
'roles',
'expires_at',
'unrestricted',
'ID',
'Name',
'Project ID',
'Description',
'Roles',
'Expires At',
'Unrestricted',
]
APPLICATION_CREDENTIAL_LIST_HEADERS = [
'ID',

View File

@ -13,37 +13,58 @@
# under the License.
#
import copy
import json
import datetime
from unittest import mock
from unittest.mock import call
from osc_lib import exceptions
from osc_lib import utils
from openstack import exceptions as sdk_exceptions
from openstack.identity.v3 import (
application_credential as _application_credential,
)
from openstack.identity.v3 import role as _role
from openstack.test import fakes as sdk_fakes
from openstackclient.identity.v3 import application_credential
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
class TestApplicationCredential(identity_fakes.TestIdentityv3):
class TestApplicationCredentialCreate(identity_fakes.TestIdentityv3):
columns = (
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
'Secret',
)
def setUp(self):
super().setUp()
identity_manager = self.identity_client
self.app_creds_mock = identity_manager.application_credentials
self.app_creds_mock.reset_mock()
self.roles_mock = identity_manager.roles
self.roles_mock.reset_mock()
self.roles = sdk_fakes.generate_fake_resource(_role.Role)
self.application_credential = sdk_fakes.generate_fake_resource(
resource_type=_application_credential.ApplicationCredential,
roles=[],
)
self.datalist = (
self.application_credential.id,
self.application_credential.name,
self.application_credential.description,
self.application_credential.project_id,
self.application_credential.roles,
self.application_credential.unrestricted,
self.application_credential.access_rules,
self.application_credential.expires_at,
self.application_credential.secret,
)
class TestApplicationCredentialCreate(TestApplicationCredential):
def setUp(self):
super().setUp()
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
self.identity_sdk_client.create_application_credential.return_value = (
self.application_credential
)
# Get the command object to test
@ -52,17 +73,14 @@ class TestApplicationCredentialCreate(TestApplicationCredential):
)
def test_application_credential_create_basic(self):
self.app_creds_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_BASIC),
loaded=True,
)
name = identity_fakes.app_cred_name
name = self.application_credential.name
arglist = [name]
verifylist = [('name', identity_fakes.app_cred_name)]
verifylist = [('name', self.application_credential.name)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
# In base command class ShowOne in cliff, abstract method take_action()
# returns a two-part tuple with a tuple of column names and a tuple of
# data to be shown.
@ -70,68 +88,45 @@ class TestApplicationCredentialCreate(TestApplicationCredential):
# Set expected values
kwargs = {
'secret': None,
'roles': [],
'expires_at': None,
'description': None,
'secret': None,
'unrestricted': False,
'access_rules': None,
'access_rules': [],
}
self.app_creds_mock.create.assert_called_with(name, **kwargs)
self.identity_sdk_client.create_application_credential.assert_called_with(
user_id, name, **kwargs
)
collist = (
'access_rules',
'description',
'expires_at',
'id',
'name',
'project_id',
'roles',
'secret',
'unrestricted',
)
self.assertEqual(collist, columns)
datalist = (
None,
None,
None,
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.app_cred_secret,
False,
)
self.assertEqual(datalist, data)
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
def test_application_credential_create_with_options(self):
name = identity_fakes.app_cred_name
self.app_creds_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_OPTIONS),
loaded=True,
)
name = self.application_credential.name
arglist = [
name,
'--secret',
'moresecuresecret',
'--role',
identity_fakes.role_id,
self.roles.id,
'--expiration',
identity_fakes.app_cred_expires_str,
'2024-01-01T00:00:00',
'--description',
'credential for testing',
]
verifylist = [
('name', identity_fakes.app_cred_name),
('name', self.application_credential.name),
('secret', 'moresecuresecret'),
('role', [identity_fakes.role_id]),
('expiration', identity_fakes.app_cred_expires_str),
('role', [self.roles.id]),
('expiration', '2024-01-01T00:00:00'),
('description', 'credential for testing'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
# In base command class ShowOne in cliff, abstract method take_action()
# returns a two-part tuple with a tuple of column names and a tuple of
# data to be shown.
@ -139,172 +134,119 @@ class TestApplicationCredentialCreate(TestApplicationCredential):
# Set expected values
kwargs = {
'secret': 'moresecuresecret',
'roles': [identity_fakes.role_id],
'expires_at': identity_fakes.app_cred_expires,
'roles': [{'id': self.roles.id}],
'expires_at': datetime.datetime(2024, 1, 1, 0, 0),
'description': 'credential for testing',
'secret': 'moresecuresecret',
'unrestricted': False,
'access_rules': None,
'access_rules': [],
}
self.app_creds_mock.create.assert_called_with(name, **kwargs)
self.identity_sdk_client.create_application_credential.assert_called_with(
user_id, name, **kwargs
)
collist = (
'access_rules',
'description',
'expires_at',
'id',
'name',
'project_id',
'roles',
'secret',
'unrestricted',
)
self.assertEqual(collist, columns)
datalist = (
None,
identity_fakes.app_cred_description,
identity_fakes.app_cred_expires_str,
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.app_cred_secret,
False,
)
self.assertEqual(datalist, data)
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
def test_application_credential_create_with_access_rules_string(self):
name = identity_fakes.app_cred_name
self.app_creds_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_ACCESS_RULES),
loaded=True,
)
name = self.application_credential.name
arglist = [
name,
'--access-rules',
identity_fakes.app_cred_access_rules,
'[{"path": "/v2.1/servers", "method": "GET", "service": "compute"}]',
]
verifylist = [
('name', identity_fakes.app_cred_name),
('access_rules', identity_fakes.app_cred_access_rules),
('name', self.application_credential.name),
(
'access_rules',
'[{"path": "/v2.1/servers", "method": "GET", "service": "compute"}]',
),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'secret': None,
'roles': [],
'expires_at': None,
'description': None,
'secret': None,
'unrestricted': False,
'access_rules': json.loads(identity_fakes.app_cred_access_rules),
'access_rules': [
{
"path": "/v2.1/servers",
"method": "GET",
"service": "compute",
}
],
}
self.app_creds_mock.create.assert_called_with(name, **kwargs)
self.identity_sdk_client.create_application_credential.assert_called_with(
user_id, name, **kwargs
)
collist = (
'access_rules',
'description',
'expires_at',
'id',
'name',
'project_id',
'roles',
'secret',
'unrestricted',
)
self.assertEqual(collist, columns)
datalist = (
identity_fakes.app_cred_access_rules,
None,
None,
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.app_cred_secret,
False,
)
self.assertEqual(datalist, data)
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
@mock.patch('openstackclient.identity.v3.application_credential.json.load')
@mock.patch('openstackclient.identity.v3.application_credential.open')
def test_application_credential_create_with_access_rules_file(
self, _, mock_json_load
):
mock_json_load.return_value = identity_fakes.app_cred_access_rules
name = identity_fakes.app_cred_name
self.app_creds_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_ACCESS_RULES),
loaded=True,
)
mock_json_load.return_value = '/tmp/access_rules.json'
name = self.application_credential.name
arglist = [
name,
'--access-rules',
identity_fakes.app_cred_access_rules_path,
'/tmp/access_rules.json',
]
verifylist = [
('name', identity_fakes.app_cred_name),
('access_rules', identity_fakes.app_cred_access_rules_path),
('name', self.application_credential.name),
('access_rules', '/tmp/access_rules.json'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'secret': None,
'roles': [],
'expires_at': None,
'description': None,
'secret': None,
'unrestricted': False,
'access_rules': identity_fakes.app_cred_access_rules,
'access_rules': '/tmp/access_rules.json',
}
self.app_creds_mock.create.assert_called_with(name, **kwargs)
collist = (
'access_rules',
'description',
'expires_at',
'id',
'name',
'project_id',
'roles',
'secret',
'unrestricted',
self.identity_sdk_client.create_application_credential.assert_called_with(
user_id, name, **kwargs
)
self.assertEqual(collist, columns)
datalist = (
identity_fakes.app_cred_access_rules,
None,
None,
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.app_cred_secret,
False,
)
self.assertEqual(datalist, data)
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
class TestApplicationCredentialDelete(TestApplicationCredential):
class TestApplicationCredentialDelete(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
# This is the return value for utils.find_resource()
self.app_creds_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_BASIC),
loaded=True,
self.application_credential = sdk_fakes.generate_fake_resource(
resource_type=_application_credential.ApplicationCredential,
roles=[],
)
self.identity_sdk_client.find_application_credential.return_value = (
self.application_credential
)
self.identity_sdk_client.delete_application_credential.return_value = (
None
)
self.app_creds_mock.delete.return_value = None
# Get the command object to test
self.cmd = application_credential.DeleteApplicationCredential(
@ -313,26 +255,31 @@ class TestApplicationCredentialDelete(TestApplicationCredential):
def test_application_credential_delete(self):
arglist = [
identity_fakes.app_cred_id,
self.application_credential.id,
]
verifylist = [
('application_credential', [self.application_credential.id])
]
verifylist = [('application_credential', [identity_fakes.app_cred_id])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
result = self.cmd.take_action(parsed_args)
self.app_creds_mock.delete.assert_called_with(
identity_fakes.app_cred_id,
self.identity_sdk_client.delete_application_credential.assert_called_with(
user_id,
self.application_credential.id,
)
self.assertIsNone(result)
@mock.patch.object(utils, 'find_resource')
def test_delete_multi_app_creds_with_exception(self, find_mock):
find_mock.side_effect = [
self.app_creds_mock.get.return_value,
exceptions.CommandError,
def test_delete_multi_app_creds_with_exception(self):
self.identity_sdk_client.find_application_credential.side_effect = [
self.application_credential,
sdk_exceptions.NotFoundException,
]
arglist = [
identity_fakes.app_cred_id,
self.application_credential.id,
'nonexistent_app_cred',
]
verifylist = [
@ -340,6 +287,9 @@ class TestApplicationCredentialDelete(TestApplicationCredential):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
@ -348,27 +298,32 @@ class TestApplicationCredentialDelete(TestApplicationCredential):
'1 of 2 application credentials failed to' ' delete.', str(e)
)
find_mock.assert_any_call(
self.app_creds_mock, identity_fakes.app_cred_id
)
find_mock.assert_any_call(self.app_creds_mock, 'nonexistent_app_cred')
calls = []
for a in arglist:
calls.append(call(user_id, a))
self.assertEqual(2, find_mock.call_count)
self.app_creds_mock.delete.assert_called_once_with(
identity_fakes.app_cred_id
self.identity_sdk_client.find_application_credential.assert_has_calls(
calls
)
self.assertEqual(
2, self.identity_sdk_client.find_application_credential.call_count
)
self.identity_sdk_client.delete_application_credential.assert_called_once_with(
user_id, self.application_credential.id
)
class TestApplicationCredentialList(TestApplicationCredential):
class TestApplicationCredentialList(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
self.app_creds_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_BASIC),
loaded=True,
),
self.application_credential = sdk_fakes.generate_fake_resource(
resource_type=_application_credential.ApplicationCredential,
roles=[],
)
self.identity_sdk_client.application_credentials.return_value = [
self.application_credential
]
# Get the command object to test
@ -381,35 +336,54 @@ class TestApplicationCredentialList(TestApplicationCredential):
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
# In base command class Lister in cliff, abstract method take_action()
# returns a tuple containing the column names and an iterable
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
self.app_creds_mock.list.assert_called_with(user=None)
self.identity_sdk_client.application_credentials.assert_called_with(
user=user_id
)
collist = ('ID', 'Name', 'Project ID', 'Description', 'Expires At')
collist = (
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
)
self.assertEqual(collist, columns)
datalist = (
(
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
None,
None,
self.application_credential.id,
self.application_credential.name,
self.application_credential.description,
self.application_credential.project_id,
self.application_credential.roles,
self.application_credential.unrestricted,
self.application_credential.access_rules,
self.application_credential.expires_at,
),
)
self.assertEqual(datalist, tuple(data))
class TestApplicationCredentialShow(TestApplicationCredential):
class TestApplicationCredentialShow(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
self.app_creds_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.APP_CRED_BASIC),
loaded=True,
self.application_credential = sdk_fakes.generate_fake_resource(
resource_type=_application_credential.ApplicationCredential,
roles=[],
)
self.identity_sdk_client.find_application_credential.return_value = (
self.application_credential
)
# Get the command object to test
@ -419,41 +393,44 @@ class TestApplicationCredentialShow(TestApplicationCredential):
def test_application_credential_show(self):
arglist = [
identity_fakes.app_cred_id,
self.application_credential.id,
]
verifylist = [
('application_credential', identity_fakes.app_cred_id),
('application_credential', self.application_credential.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
# In base command class ShowOne in cliff, abstract method take_action()
# returns a two-part tuple with a tuple of column names and a tuple of
# data to be shown.
columns, data = self.cmd.take_action(parsed_args)
self.app_creds_mock.get.assert_called_with(identity_fakes.app_cred_id)
self.identity_sdk_client.find_application_credential.assert_called_with(
user_id, self.application_credential.id
)
collist = (
'access_rules',
'description',
'expires_at',
'id',
'name',
'project_id',
'roles',
'secret',
'unrestricted',
'ID',
'Name',
'Description',
'Project ID',
'Roles',
'Unrestricted',
'Access Rules',
'Expires At',
)
self.assertEqual(collist, columns)
datalist = (
None,
None,
None,
identity_fakes.app_cred_id,
identity_fakes.app_cred_name,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.app_cred_secret,
False,
self.application_credential.id,
self.application_credential.name,
self.application_credential.description,
self.application_credential.project_id,
self.application_credential.roles,
self.application_credential.unrestricted,
self.application_credential.access_rules,
self.application_credential.expires_at,
)
self.assertEqual(datalist, data)

View File

@ -0,0 +1,9 @@
---
features:
- |
The following commands have been migrated to SDK:
- ``application credential create``
- ``application credential delete``
- ``application credential list``
- ``application credential show``