Assign roles to group for a region
This patch provides the fuctionality of assigning roles to group for a particular region. Change-Id: Ie1a246d46fffe3d8976546c2e276efd93a3b424d
This commit is contained in:
parent
cf78b0eaf5
commit
b400f52e9d
@ -352,6 +352,22 @@ def add_to_parser(service_sub):
|
|||||||
'datafile', type=argparse.FileType('r'),
|
'datafile', type=argparse.FileType('r'),
|
||||||
help='<data file with group role(s) assignment JSON>')
|
help='<data file with group role(s) assignment JSON>')
|
||||||
|
|
||||||
|
# assign group region roles
|
||||||
|
parser_assign_group_region_roles = subparsers.add_parser(
|
||||||
|
'assign_group_region_roles',
|
||||||
|
help='[<"X-RANGER-Client" '
|
||||||
|
'header>] <group id> <region_id> '
|
||||||
|
'<data file with group role(s) assignment JSON>')
|
||||||
|
parser_assign_group_region_roles.add_argument(
|
||||||
|
'client', **cli_common.ORM_CLIENT_KWARGS)
|
||||||
|
parser_assign_group_region_roles.add_argument(
|
||||||
|
'groupid', type=str, help='<groupid id>')
|
||||||
|
parser_assign_group_region_roles.add_argument(
|
||||||
|
'regionid', type=str, help='<regionid id>')
|
||||||
|
parser_assign_group_region_roles.add_argument(
|
||||||
|
'datafile', type=argparse.FileType('r'),
|
||||||
|
help='<data file with group role(s) assignment JSON>')
|
||||||
|
|
||||||
# unassign group roles
|
# unassign group roles
|
||||||
parser_unassign_group_role = subparsers.add_parser(
|
parser_unassign_group_role = subparsers.add_parser(
|
||||||
'unassign_group_role',
|
'unassign_group_role',
|
||||||
@ -536,6 +552,9 @@ def cmd_details(args):
|
|||||||
return requests.get, 'groups/%s' % param
|
return requests.get, 'groups/%s' % param
|
||||||
elif args.subcmd == 'assign_group_roles':
|
elif args.subcmd == 'assign_group_roles':
|
||||||
return requests.post, 'groups/%s/roles' % args.groupid
|
return requests.post, 'groups/%s/roles' % args.groupid
|
||||||
|
elif args.subcmd == 'assign_group_region_roles':
|
||||||
|
return requests.post, 'groups/%s/regions/%s/roles' % (
|
||||||
|
args.groupid, args.regionid)
|
||||||
elif args.subcmd == 'unassign_group_role':
|
elif args.subcmd == 'unassign_group_role':
|
||||||
if args.customer and args.domain:
|
if args.customer and args.domain:
|
||||||
print("--customer and --domain cannot be specified "
|
print("--customer and --domain cannot be specified "
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
from oslo_db.exception import DBDuplicateEntry
|
||||||
|
from pecan import request, rest
|
||||||
|
from wsmeext.pecan import wsexpose
|
||||||
|
|
||||||
|
from orm.common.orm_common.utils import api_error_utils as err_utils
|
||||||
|
from orm.common.orm_common.utils import utils
|
||||||
|
from orm.services.customer_manager.cms_rest.logger import get_logger
|
||||||
|
from orm.services.customer_manager.cms_rest.logic.error_base import ErrorStatus
|
||||||
|
from orm.services.customer_manager.cms_rest.logic.group_logic import GroupLogic
|
||||||
|
from orm.services.customer_manager.cms_rest.model.GroupModels import \
|
||||||
|
RoleAssignment, RoleResultWrapper
|
||||||
|
from orm.services.customer_manager.cms_rest.utils import authentication
|
||||||
|
|
||||||
|
LOG = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RegionRoleController(rest.RestController):
|
||||||
|
|
||||||
|
@wsexpose(str, str, str, rest_content_types='json')
|
||||||
|
def get(self, group_id, region):
|
||||||
|
return "This is groups region role controller group id:{0} " \
|
||||||
|
"region:{1}".format(group_id, region)
|
||||||
|
|
||||||
|
@wsexpose(RoleResultWrapper, str, str, body=[RoleAssignment],
|
||||||
|
rest_content_types='json', status_code=200)
|
||||||
|
def post(self, group_id, region, role_assignments):
|
||||||
|
LOG.info("RegionRoleController - Assign Roles to group id [{0}] for "
|
||||||
|
"region [{1}] roles [{2}]".format(
|
||||||
|
group_id, region, str(role_assignments)))
|
||||||
|
authentication.authorize(request, 'groups:assign_region_role')
|
||||||
|
try:
|
||||||
|
group_logic = GroupLogic()
|
||||||
|
result = group_logic.assign_roles(group_id,
|
||||||
|
role_assignments,
|
||||||
|
request.transaction_id,
|
||||||
|
region)
|
||||||
|
LOG.info("RegionRoleController - Roles assigned: " + str(result))
|
||||||
|
|
||||||
|
event_details = 'Group {} - region roles assigned'.format(group_id)
|
||||||
|
utils.audit_trail('assigned group region roles',
|
||||||
|
request.transaction_id,
|
||||||
|
request.headers,
|
||||||
|
group_id,
|
||||||
|
event_details=event_details)
|
||||||
|
|
||||||
|
except DBDuplicateEntry as exception:
|
||||||
|
LOG.log_exception(
|
||||||
|
"DBDuplicateEntry-Group Region Roles already assigned.",
|
||||||
|
exception)
|
||||||
|
raise err_utils.get_error(
|
||||||
|
request.transaction_id,
|
||||||
|
status_code=409,
|
||||||
|
message='Duplicate Entry-Group Region Roles already assigned.',
|
||||||
|
error_details=exception.message)
|
||||||
|
|
||||||
|
except ErrorStatus as exception:
|
||||||
|
LOG.log_exception(
|
||||||
|
"ErrorStatus - Failed to assign roles for region.", exception)
|
||||||
|
raise err_utils.get_error(request.transaction_id,
|
||||||
|
message=exception.message,
|
||||||
|
status_code=exception.status_code)
|
||||||
|
except Exception as exception:
|
||||||
|
LOG.log_exception(
|
||||||
|
"Exception - Failed in assign roles for region.", exception)
|
||||||
|
raise err_utils.get_error(request.transaction_id,
|
||||||
|
status_code=500,
|
||||||
|
error_details=str(exception))
|
||||||
|
|
||||||
|
return result
|
@ -4,8 +4,10 @@ from wsmeext.pecan import wsexpose
|
|||||||
|
|
||||||
from orm.common.orm_common.utils import api_error_utils as err_utils
|
from orm.common.orm_common.utils import api_error_utils as err_utils
|
||||||
from orm.common.orm_common.utils import utils
|
from orm.common.orm_common.utils import utils
|
||||||
from orm.services.customer_manager.cms_rest.controllers.v1.orm.group.region_users \
|
from orm.services.customer_manager.cms_rest.controllers.v1.orm.group.\
|
||||||
import RegionUserController
|
region_roles import RegionRoleController
|
||||||
|
from orm.services.customer_manager.cms_rest.controllers.v1.orm.group.\
|
||||||
|
region_users import RegionUserController
|
||||||
from orm.services.customer_manager.cms_rest.logger import get_logger
|
from orm.services.customer_manager.cms_rest.logger import get_logger
|
||||||
from orm.services.customer_manager.cms_rest.logic.error_base import ErrorStatus
|
from orm.services.customer_manager.cms_rest.logic.error_base import ErrorStatus
|
||||||
from orm.services.customer_manager.cms_rest.logic.group_logic import GroupLogic
|
from orm.services.customer_manager.cms_rest.logic.group_logic import GroupLogic
|
||||||
@ -19,6 +21,7 @@ LOG = get_logger(__name__)
|
|||||||
class RegionController(rest.RestController):
|
class RegionController(rest.RestController):
|
||||||
|
|
||||||
users = RegionUserController()
|
users = RegionUserController()
|
||||||
|
roles = RegionRoleController()
|
||||||
|
|
||||||
@wsexpose([str], str, str, rest_content_types='json')
|
@wsexpose([str], str, str, rest_content_types='json')
|
||||||
def get(self, group_id, region_id):
|
def get(self, group_id, region_id):
|
||||||
|
@ -109,7 +109,7 @@ class RoleController(rest.RestController):
|
|||||||
status_code=500,
|
status_code=500,
|
||||||
error_details=str(exception))
|
error_details=str(exception))
|
||||||
|
|
||||||
@wsexpose(RoleResult, str, str, str, str, rest_content_types='json')
|
@wsexpose([RoleResult], str, str, str, str, rest_content_types='json')
|
||||||
def get_all(self, group_id, region=None, customer=None, domain=None):
|
def get_all(self, group_id, region=None, customer=None, domain=None):
|
||||||
LOG.info("RoleController - GetRolelist")
|
LOG.info("RoleController - GetRolelist")
|
||||||
authentication.authorize(request, 'groups:get_all_roles')
|
authentication.authorize(request, 'groups:get_all_roles')
|
||||||
|
@ -40,11 +40,9 @@ class GroupsCustomerRoleRecord:
|
|||||||
str(groups_customer_role), exception)
|
str(groups_customer_role), exception)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def get_customer_role_by_keys(self,
|
def get_customer_roles_by_region(self,
|
||||||
group_uuid,
|
group_uuid,
|
||||||
role_id,
|
region_name):
|
||||||
region_name,
|
|
||||||
customer_id):
|
|
||||||
region_record = RegionRecord(self.session)
|
region_record = RegionRecord(self.session)
|
||||||
region_id = region_record.get_region_id_from_name(region_name)
|
region_id = region_record.get_region_id_from_name(region_name)
|
||||||
if region_id is None:
|
if region_id is None:
|
||||||
@ -54,16 +52,13 @@ class GroupsCustomerRoleRecord:
|
|||||||
try:
|
try:
|
||||||
group = self.session.query(GroupsCustomerRole).filter(
|
group = self.session.query(GroupsCustomerRole).filter(
|
||||||
GroupsCustomerRole.group_id == group_uuid,
|
GroupsCustomerRole.group_id == group_uuid,
|
||||||
GroupsCustomerRole.customer_id == customer_id,
|
GroupsCustomerRole.region_id == region_id)
|
||||||
GroupsCustomerRole.region_id == region_id,
|
return group.all()
|
||||||
GroupsCustomerRole.role_id == role_id)
|
|
||||||
return group.first()
|
|
||||||
|
|
||||||
except Exception as exception:
|
except Exception as exception:
|
||||||
message = "Failed to get group/project by keys: " \
|
message = "Failed to get roles by region: " \
|
||||||
" group_uuid:%s customer_id:%s region_name:%s " \
|
" group_uuid:%s region_name:%s " \
|
||||||
" role_id:%s " \
|
% group_uuid, region_name
|
||||||
% group_uuid, customer_id, region_id, role_id
|
|
||||||
LOG.log_exception(message, exception)
|
LOG.log_exception(message, exception)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -40,11 +40,9 @@ class GroupsDomainRoleRecord:
|
|||||||
str(groups_domain_role), exception)
|
str(groups_domain_role), exception)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def get_domain_role_by_keys(self,
|
def get_domain_roles_by_region(self,
|
||||||
group_uuid,
|
group_uuid,
|
||||||
region_name,
|
region_name):
|
||||||
domain,
|
|
||||||
role_id):
|
|
||||||
region_record = RegionRecord(self.session)
|
region_record = RegionRecord(self.session)
|
||||||
region_id = region_record.get_region_id_from_name(region_name)
|
region_id = region_record.get_region_id_from_name(region_name)
|
||||||
if region_id is None:
|
if region_id is None:
|
||||||
@ -54,15 +52,13 @@ class GroupsDomainRoleRecord:
|
|||||||
try:
|
try:
|
||||||
group = self.session.query(GroupsDomainRole).filter(
|
group = self.session.query(GroupsDomainRole).filter(
|
||||||
GroupsDomainRole.group_id == group_uuid,
|
GroupsDomainRole.group_id == group_uuid,
|
||||||
GroupsDomainRole.domain_name == domain,
|
GroupsDomainRole.region_id == region_id)
|
||||||
GroupsDomainRole.region_id == region_id,
|
return group.all()
|
||||||
GroupsDomainRole.role_id == role_id)
|
|
||||||
return group.first()
|
|
||||||
|
|
||||||
except Exception as exception:
|
except Exception as exception:
|
||||||
message = "Failed to get group/domain by primary keys: " \
|
message = "Failed to get group roles by region: " \
|
||||||
" group_uuid:%s domain:%s region_name:%s role_id: %s" \
|
" group_uuid:%s region_name:%s " \
|
||||||
% group_uuid, domain, region_id, role_id
|
% group_uuid, region_name
|
||||||
LOG.log_exception(message, exception)
|
LOG.log_exception(message, exception)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -43,7 +43,6 @@ class GroupsRegionRecord:
|
|||||||
group_regions = []
|
group_regions = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
group_record = GroupRecord(self.session)
|
|
||||||
query = self.session.query(GroupsRegion).filter(
|
query = self.session.query(GroupsRegion).filter(
|
||||||
GroupsRegion.group_id == group_uuid,
|
GroupsRegion.group_id == group_uuid,
|
||||||
GroupsRegion.region_id != -1)
|
GroupsRegion.region_id != -1)
|
||||||
@ -57,6 +56,27 @@ class GroupsRegionRecord:
|
|||||||
LOG.log_exception(message, exception)
|
LOG.log_exception(message, exception)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def get_region_by_keys(self, group_uuid, region_name):
|
||||||
|
# get region id by name
|
||||||
|
region_record = RegionRecord(self.session)
|
||||||
|
region_id = region_record.get_region_id_from_name(region_name)
|
||||||
|
if region_id is None:
|
||||||
|
raise ValueError(
|
||||||
|
'region with the region name {0} not found'.format(
|
||||||
|
region_name))
|
||||||
|
try:
|
||||||
|
query = self.session.query(GroupsRegion).filter(
|
||||||
|
GroupsRegion.group_id == group_uuid,
|
||||||
|
GroupsRegion.region_id == region_id)
|
||||||
|
|
||||||
|
return query.first()
|
||||||
|
except Exception as exception:
|
||||||
|
message = "Failed to get groups region record by keys: " \
|
||||||
|
" group_uuid:%s region_name:%s " \
|
||||||
|
% group_uuid, region_name
|
||||||
|
LOG.log_exception(message, exception)
|
||||||
|
raise
|
||||||
|
|
||||||
def delete_region_for_group(self, group_uuid, region_name):
|
def delete_region_for_group(self, group_uuid, region_name):
|
||||||
# get region id by name
|
# get region id by name
|
||||||
region_record = RegionRecord(self.session)
|
region_record = RegionRecord(self.session)
|
||||||
|
@ -129,12 +129,8 @@ class Groups(Base, CMSBaseModel):
|
|||||||
regions = [group_region.to_wsme() for group_region in
|
regions = [group_region.to_wsme() for group_region in
|
||||||
self.group_regions if group_region.region_id != -1]
|
self.group_regions if group_region.region_id != -1]
|
||||||
|
|
||||||
roles = []
|
roles = [group_role.get_role_name() for group_role in
|
||||||
customer_roles = [customer_role.get_role_name() for customer_role in
|
self.groups_roles]
|
||||||
self.groups_customer_roles]
|
|
||||||
domain_roles = [domain_role.get_role_name() for domain_role in
|
|
||||||
self.groups_domain_roles]
|
|
||||||
roles = sorted(set(customer_roles + domain_roles))
|
|
||||||
|
|
||||||
users = []
|
users = []
|
||||||
unique_domain = {}
|
unique_domain = {}
|
||||||
@ -153,7 +149,7 @@ class Groups(Base, CMSBaseModel):
|
|||||||
name=name,
|
name=name,
|
||||||
uuid=uuid,
|
uuid=uuid,
|
||||||
regions=regions,
|
regions=regions,
|
||||||
roles=list(roles),
|
roles=sorted(roles),
|
||||||
users=users,
|
users=users,
|
||||||
enabled=enabled,
|
enabled=enabled,
|
||||||
domain=domain_name)
|
domain=domain_name)
|
||||||
@ -255,6 +251,9 @@ class GroupsRole(Base, CMSBaseModel):
|
|||||||
"group_id": self.group_id
|
"group_id": self.group_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def get_role_name(self):
|
||||||
|
return self.role.name
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
' GroupsCustomerRole is a DataObject and contains all the fields defined in
|
' GroupsCustomerRole is a DataObject and contains all the fields defined in
|
||||||
@ -299,9 +298,6 @@ class GroupsCustomerRole(Base, CMSBaseModel):
|
|||||||
"role_name": self.groups_role.role.name
|
"role_name": self.groups_role.role.name
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_role_name(self):
|
|
||||||
return self.groups_role.role.name
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
' GroupsDomainRole is a DataObject and contains all the fields defined in
|
' GroupsDomainRole is a DataObject and contains all the fields defined in
|
||||||
@ -344,9 +340,6 @@ class GroupsDomainRole(Base, CMSBaseModel):
|
|||||||
"role_name": self.groups_role.role.name
|
"role_name": self.groups_role.role.name
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_role_name(self):
|
|
||||||
return self.groups_role.role.name
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
' GroupsUser is a DataObject and contains all the fields defined in GroupRole
|
' GroupsUser is a DataObject and contains all the fields defined in GroupRole
|
||||||
|
@ -45,6 +45,7 @@
|
|||||||
"groups:add_region": "rule:admin_or_support_or_creator",
|
"groups:add_region": "rule:admin_or_support_or_creator",
|
||||||
"groups:delete_region": "rule:admin_or_creator",
|
"groups:delete_region": "rule:admin_or_creator",
|
||||||
"groups:assign_role": "rule:admin_or_support_or_creator",
|
"groups:assign_role": "rule:admin_or_support_or_creator",
|
||||||
|
"groups:assign_region_role": "rule:admin_or_support_or_creator",
|
||||||
"groups:unassign_role": "rule:admin_or_creator",
|
"groups:unassign_role": "rule:admin_or_creator",
|
||||||
"groups:add_group_default_users": "rule:admin_or_support",
|
"groups:add_group_default_users": "rule:admin_or_support",
|
||||||
"groups:delete_group_default_user": "rule:admin",
|
"groups:delete_group_default_user": "rule:admin",
|
||||||
|
@ -3,9 +3,9 @@ from pecan import conf, request
|
|||||||
import requests
|
import requests
|
||||||
|
|
||||||
from orm.common.orm_common.utils import utils
|
from orm.common.orm_common.utils import utils
|
||||||
from orm.common.orm_common.utils.cross_api_utils import (get_regions_of_group,
|
from orm.common.orm_common.utils.cross_api_utils import (
|
||||||
|
get_regions_of_group,
|
||||||
set_utils_conf)
|
set_utils_conf)
|
||||||
|
|
||||||
from orm.services.customer_manager.cms_rest.data.data_manager import \
|
from orm.services.customer_manager.cms_rest.data.data_manager import \
|
||||||
DataManager
|
DataManager
|
||||||
from orm.services.customer_manager.cms_rest.logger import get_logger
|
from orm.services.customer_manager.cms_rest.logger import get_logger
|
||||||
@ -91,7 +91,8 @@ class GroupLogic(object):
|
|||||||
def assign_roles(self,
|
def assign_roles(self,
|
||||||
group_uuid,
|
group_uuid,
|
||||||
role_assignments,
|
role_assignments,
|
||||||
transaction_id):
|
transaction_id,
|
||||||
|
region=None):
|
||||||
|
|
||||||
datamanager = DataManager()
|
datamanager = DataManager()
|
||||||
try:
|
try:
|
||||||
@ -99,8 +100,16 @@ class GroupLogic(object):
|
|||||||
|
|
||||||
group_record = datamanager.get_record('group')
|
group_record = datamanager.get_record('group')
|
||||||
region_record = datamanager.get_record('groups_region')
|
region_record = datamanager.get_record('groups_region')
|
||||||
groups_regions = region_record.get_regions_for_group(group_uuid)
|
|
||||||
|
|
||||||
|
# If region is not specified, then get all the regions already
|
||||||
|
# associated with the group for role assginement; otherwise,
|
||||||
|
# just assign roles for the passed in region only.
|
||||||
|
if region is None:
|
||||||
|
groups_regions = region_record.get_regions_for_group(
|
||||||
|
group_uuid)
|
||||||
|
else:
|
||||||
|
groups_regions = [region_record.get_region_by_keys(
|
||||||
|
group_uuid, region)]
|
||||||
for role_assignment in role_assignments:
|
for role_assignment in role_assignments:
|
||||||
for role in role_assignment.roles:
|
for role in role_assignment.roles:
|
||||||
role_id = datamanager.get_role_id_by_name(role)
|
role_id = datamanager.get_role_id_by_name(role)
|
||||||
@ -704,27 +713,60 @@ class GroupLogic(object):
|
|||||||
if customer_uuid is not None and domain_name is not None:
|
if customer_uuid is not None and domain_name is not None:
|
||||||
raise ErrorStatus(400, "customer and domain cannot be used at "
|
raise ErrorStatus(400, "customer and domain cannot be used at "
|
||||||
"the same time for query in request uri.")
|
"the same time for query in request uri.")
|
||||||
if customer_uuid is None and domain_name is None:
|
|
||||||
raise ErrorStatus(400, "customer or domain is required for query "
|
|
||||||
"in request uri.")
|
|
||||||
|
|
||||||
|
role_result = []
|
||||||
|
roles = []
|
||||||
datamanager = DataManager()
|
datamanager = DataManager()
|
||||||
|
# filter by region
|
||||||
|
if customer_uuid is None and domain_name is None:
|
||||||
|
record = datamanager.get_record('groups_customer_role')
|
||||||
|
sql_customers_roles = record.get_customer_roles_by_region(
|
||||||
|
group_uuid, region_name)
|
||||||
|
|
||||||
|
record = datamanager.get_record('groups_domain_role')
|
||||||
|
sql_domains_roles = record.get_domain_roles_by_region(
|
||||||
|
group_uuid, region_name)
|
||||||
|
|
||||||
|
unique_customer = {}
|
||||||
|
for customer in sql_customers_roles:
|
||||||
|
if customer.customer.uuid in unique_customer:
|
||||||
|
unique_customer[customer.customer.uuid].append(customer.groups_role.role.name)
|
||||||
|
else:
|
||||||
|
unique_customer[customer.customer.uuid] = [customer.groups_role.role.name]
|
||||||
|
|
||||||
|
for customer, role_list in unique_customer.items():
|
||||||
|
role_result.append(RoleResult(roles=role_list, customer=customer))
|
||||||
|
|
||||||
|
unique_domain = {}
|
||||||
|
for domain in sql_domains_roles:
|
||||||
|
if domain.domain_name in unique_domain:
|
||||||
|
unique_domain[domain.domain_name].append(domain.groups_role.role.name)
|
||||||
|
else:
|
||||||
|
unique_domain[domain.domain_name] = [domain.groups_role.role.name]
|
||||||
|
|
||||||
|
for domain, role_list in unique_domain.items():
|
||||||
|
role_result.append(RoleResult(roles=role_list, domain=domain))
|
||||||
|
|
||||||
|
return role_result
|
||||||
|
|
||||||
|
# filter by customer
|
||||||
|
elif customer_uuid is not None:
|
||||||
customer_id = datamanager.get_customer_id_by_uuid(customer_uuid)
|
customer_id = datamanager.get_customer_id_by_uuid(customer_uuid)
|
||||||
if customer_uuid is not None:
|
|
||||||
record = datamanager.get_record('groups_customer_role')
|
record = datamanager.get_record('groups_customer_role')
|
||||||
sql_roles = record.get_customer_roles_by_criteria(
|
sql_roles = record.get_customer_roles_by_criteria(
|
||||||
group_uuid, region_name, customer_id)
|
group_uuid, region_name, customer_id)
|
||||||
|
|
||||||
|
# filter by domain
|
||||||
else:
|
else:
|
||||||
record = datamanager.get_record('groups_domain_role')
|
record = datamanager.get_record('groups_domain_role')
|
||||||
sql_roles = record.get_domain_roles_by_criteria(
|
sql_roles = record.get_domain_roles_by_criteria(
|
||||||
group_uuid, region_name, domain_name)
|
group_uuid, region_name, domain_name)
|
||||||
|
|
||||||
roles = []
|
|
||||||
if sql_roles:
|
if sql_roles:
|
||||||
roles = [sql_role.groups_role.role.name for sql_role in sql_roles
|
roles = [sql_role.groups_role.role.name for sql_role in sql_roles
|
||||||
if sql_role and sql_role.groups_role.role.name]
|
if sql_role and sql_role.groups_role.role.name]
|
||||||
|
|
||||||
return RoleResult(roles=roles)
|
return [RoleResult(roles=roles)]
|
||||||
|
|
||||||
def delete_group_by_uuid(self, group_id):
|
def delete_group_by_uuid(self, group_id):
|
||||||
datamanager = DataManager()
|
datamanager = DataManager()
|
||||||
|
@ -331,7 +331,6 @@ class TestGroupLogic(FunctionalTest):
|
|||||||
logic.get_group_roles_by_criteria(
|
logic.get_group_roles_by_criteria(
|
||||||
'group_uuid', 'region', None, 'domain')
|
'group_uuid', 'region', None, 'domain')
|
||||||
|
|
||||||
self.assertTrue(data_manager_mock.get_customer_id_by_uuid.called)
|
|
||||||
self.assertTrue(data_manager_mock.get_record.called)
|
self.assertTrue(data_manager_mock.get_record.called)
|
||||||
self.assertTrue(record_mock.get_domain_roles_by_criteria.called)
|
self.assertTrue(record_mock.get_domain_roles_by_criteria.called)
|
||||||
|
|
||||||
@ -342,13 +341,6 @@ class TestGroupLogic(FunctionalTest):
|
|||||||
self.assertEqual(cm.exception.status_code, 400)
|
self.assertEqual(cm.exception.status_code, 400)
|
||||||
self.assertIn('region must be specified', cm.exception.message)
|
self.assertIn('region must be specified', cm.exception.message)
|
||||||
|
|
||||||
def test_get_group_roles_by_criteria_missing_optional_parms(self):
|
|
||||||
logic = group_logic.GroupLogic()
|
|
||||||
with self.assertRaises(ErrorStatus) as cm:
|
|
||||||
logic.get_group_roles_by_criteria('group', 'region', None, None)
|
|
||||||
self.assertEqual(cm.exception.status_code, 400)
|
|
||||||
self.assertIn('customer or domain is required', cm.exception.message)
|
|
||||||
|
|
||||||
def test_get_group_roles_by_criteria_conflicting_optional_parms(self):
|
def test_get_group_roles_by_criteria_conflicting_optional_parms(self):
|
||||||
logic = group_logic.GroupLogic()
|
logic = group_logic.GroupLogic()
|
||||||
with self.assertRaises(ErrorStatus) as cm:
|
with self.assertRaises(ErrorStatus) as cm:
|
||||||
|
@ -181,7 +181,7 @@ def get_mock_group_logic():
|
|||||||
|
|
||||||
group_logic_mock.assign_roles.return_value = res
|
group_logic_mock.assign_roles.return_value = res
|
||||||
group_logic_mock.unassign_roles.return_value = res1
|
group_logic_mock.unassign_roles.return_value = res1
|
||||||
group_logic_mock.get_group_roles_by_criteria.return_value = list_res
|
group_logic_mock.get_group_roles_by_criteria.return_value = [list_res]
|
||||||
|
|
||||||
elif roles.GroupLogic.return_error == 1:
|
elif roles.GroupLogic.return_error == 1:
|
||||||
group_logic_mock.assign_roles.side_effect = SystemError()
|
group_logic_mock.assign_roles.side_effect = SystemError()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user