From 209f8e9e171981f2714f3ac63e8ad947409a60d6 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Thu, 16 May 2024 12:01:44 +0100 Subject: [PATCH] network: Replace use of in-tree API client None of these are actually supported by openstacksdk (intentionally so) so we add our own manual implementations. Change-Id: Ifd24f04ae4d1e56e0ce5ba0afe63828403bb7a6f Signed-off-by: Stephen Finucane --- openstackclient/api/compute_v2.py | 815 +++++---------- openstackclient/compute/client.py | 20 - openstackclient/compute/v2/server.py | 11 +- openstackclient/network/common.py | 7 +- openstackclient/network/v2/floating_ip.py | 14 +- .../network/v2/floating_ip_pool.py | 19 +- openstackclient/network/v2/network.py | 11 +- openstackclient/network/v2/security_group.py | 30 +- .../network/v2/security_group_rule.py | 33 +- .../tests/unit/api/test_compute_v2.py | 975 ++++++------------ .../tests/unit/compute/v2/fakes.py | 6 - .../tests/unit/compute/v2/test_server.py | 78 +- .../tests/unit/network/test_common.py | 4 +- .../network/v2/test_floating_ip_compute.py | 70 +- .../v2/test_floating_ip_pool_compute.py | 9 +- .../unit/network/v2/test_network_compute.py | 121 +-- .../network/v2/test_security_group_compute.py | 102 +- .../v2/test_security_group_rule_compute.py | 94 +- 18 files changed, 917 insertions(+), 1502 deletions(-) diff --git a/openstackclient/api/compute_v2.py b/openstackclient/api/compute_v2.py index 4fcf3f1344..95521eae9a 100644 --- a/openstackclient/api/compute_v2.py +++ b/openstackclient/api/compute_v2.py @@ -9,7 +9,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# """Compute v2 API Library @@ -19,571 +18,49 @@ intentionally supported by SDK. Most of these are proxy APIs. import http -from keystoneauth1 import exceptions as ksa_exceptions from openstack import exceptions as sdk_exceptions -from osc_lib.api import api from osc_lib import exceptions -from osc_lib.i18n import _ -# TODO(dtroyer): Migrate this to osc-lib -class InvalidValue(Exception): - """An argument value is not valid: wrong type, out of range, etc""" +# security groups - message = "Supplied value is not valid" +def create_security_group(compute_client, name=None, description=None): + """Create a new security group -class APIv2(api.BaseAPI): - """Compute v2 API""" + https://docs.openstack.org/api-ref/compute/#create-security-group - def __init__(self, **kwargs): - super().__init__(**kwargs) + :param compute_client: A compute client + :param str name: Security group name + :param str description: Security group description + :returns: A security group object + """ + data = { + 'name': name, + 'description': description, + } + response = compute_client.post( + '/os-security-groups', data=data, microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + return response.json()['security_group'] - # Overrides - def _check_integer(self, value, msg=None): - """Attempt to convert value to an integer +def list_security_groups(compute_client, all_projects=None): + """Get all security groups - Raises InvalidValue on failure + https://docs.openstack.org/api-ref/compute/#list-security-groups - :param value: - Convert this to an integer. None is converted to 0 (zero). - :param msg: - An alternate message for the exception, must include exactly - one substitution to receive the attempted value. - """ - - if value is None: - return 0 - - try: - value = int(value) - except (TypeError, ValueError): - if not msg: - msg = _("%s is not an integer") % value - raise InvalidValue(msg) - return value - - # TODO(dtroyer): Override find() until these fixes get into an osc-lib - # minimum release - def find( - self, - path, - value=None, - attr=None, - ): - """Find a single resource by name or ID - - :param string path: - The API-specific portion of the URL path - :param string value: - search expression (required, really) - :param string attr: - name of attribute for secondary search - """ - - try: - ret = self._request('GET', f"/{path}/{value}").json() - if isinstance(ret, dict): - # strip off the enclosing dict - key = list(ret.keys())[0] - ret = ret[key] - except ( - ksa_exceptions.NotFound, - ksa_exceptions.BadRequest, - ): - kwargs = {attr: value} - try: - ret = self.find_one(path, **kwargs) - except ksa_exceptions.NotFound: - msg = _("%s not found") % value - raise exceptions.NotFound(msg) - - return ret - - # Floating IPs - - def floating_ip_add( - self, - server, - address, - fixed_address=None, - ): - """Add a floating IP to a server - - :param server: - The :class:`Server` (or its ID) to add an IP to. - :param address: - The FloatingIP or string floating address to add. - :param fixed_address: - The FixedIP the floatingIP should be associated with (optional) - """ - - url = '/servers' - - server = self.find( - url, - attr='name', - value=server, - ) - - address = address.ip if hasattr(address, 'ip') else address - if fixed_address: - if hasattr(fixed_address, 'ip'): - fixed_address = fixed_address.ip - - body = { - 'address': address, - 'fixed_address': fixed_address, - } - else: - body = { - 'address': address, - } - - return self._request( - "POST", - "/{}/{}/action".format(url, server['id']), - json={'addFloatingIp': body}, - ) - - def floating_ip_create( - self, - pool=None, - ): - """Create a new floating ip - - https://docs.openstack.org/api-ref/compute/#create-allocate-floating-ip-address - - :param pool: Name of floating IP pool - """ - - url = "/os-floating-ips" - - try: - return self.create( - url, - json={'pool': pool}, - )['floating_ip'] - except ( - ksa_exceptions.NotFound, - ksa_exceptions.BadRequest, - ): - msg = _("%s not found") % pool - raise exceptions.NotFound(msg) - - def floating_ip_delete( - self, - floating_ip_id=None, - ): - """Delete a floating IP - - https://docs.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address - - :param string floating_ip_id: - Floating IP ID - """ - - url = "/os-floating-ips" - - if floating_ip_id is not None: - return self.delete(f'/{url}/{floating_ip_id}') - - return None - - def floating_ip_find( - self, - floating_ip=None, - ): - """Return a security group given name or ID - - https://docs.openstack.org/api-ref/compute/#list-floating-ip-addresses - - :param string floating_ip: - Floating IP address - :returns: A dict of the floating IP attributes - """ - - url = "/os-floating-ips" - - return self.find( - url, - attr='ip', - value=floating_ip, - ) - - def floating_ip_list( - self, - ): - """Get floating IPs - - https://docs.openstack.org/api-ref/compute/#show-floating-ip-address-details - - :returns: - list of floating IPs - """ - - url = "/os-floating-ips" - - return self.list(url)["floating_ips"] - - def floating_ip_remove( - self, - server, - address, - ): - """Remove a floating IP from a server - - :param server: - The :class:`Server` (or its ID) to add an IP to. - :param address: - The FloatingIP or string floating address to add. - """ - - url = '/servers' - - server = self.find( - url, - attr='name', - value=server, - ) - - address = address.ip if hasattr(address, 'ip') else address - body = { - 'address': address, - } - - return self._request( - "POST", - "/{}/{}/action".format(url, server['id']), - json={'removeFloatingIp': body}, - ) - - # Floating IP Pools - - def floating_ip_pool_list( - self, - ): - """Get floating IP pools - - https://docs.openstack.org/api-ref/compute/?expanded=#list-floating-ip-pools - - :returns: - list of floating IP pools - """ - - url = "/os-floating-ip-pools" - - return self.list(url)["floating_ip_pools"] - - # Networks - - def network_create( - self, - name=None, - subnet=None, - share_subnet=None, - ): - """Create a new network - - https://docs.openstack.org/api-ref/compute/#create-network - - :param string name: - Network label (required) - :param integer subnet: - Subnet for IPv4 fixed addresses in CIDR notation (required) - :param integer share_subnet: - Shared subnet between projects, True or False - :returns: A dict of the network attributes - """ - - url = "/os-networks" - - params = { - 'label': name, - 'cidr': subnet, - } - if share_subnet is not None: - params['share_address'] = share_subnet - - return self.create( - url, - json={'network': params}, - )['network'] - - def network_delete( - self, - network=None, - ): - """Delete a network - - https://docs.openstack.org/api-ref/compute/#delete-network - - :param string network: - Network name or ID - """ - - url = "/os-networks" - - network = self.find( - url, - attr='label', - value=network, - )['id'] - if network is not None: - return self.delete(f'/{url}/{network}') - - return None - - def network_find( - self, - network=None, - ): - """Return a network given name or ID - - https://docs.openstack.org/api-ref/compute/#show-network-details - - :param string network: - Network name or ID - :returns: A dict of the network attributes - """ - - url = "/os-networks" - - return self.find( - url, - attr='label', - value=network, - ) - - def network_list( - self, - ): - """Get networks - - https://docs.openstack.org/api-ref/compute/#list-networks - - :returns: - list of networks - """ - - url = "/os-networks" - - return self.list(url)["networks"] - - # Security Groups - - def security_group_create( - self, - name=None, - description=None, - ): - """Create a new security group - - https://docs.openstack.org/api-ref/compute/#create-security-group - - :param string name: - Security group name - :param integer description: - Security group description - """ - - url = "/os-security-groups" - - params = { - 'name': name, - 'description': description, - } - - return self.create( - url, - json={'security_group': params}, - )['security_group'] - - def security_group_delete( - self, - security_group=None, - ): - """Delete a security group - - https://docs.openstack.org/api-ref/compute/#delete-security-group - - :param string security_group: - Security group name or ID - """ - - url = "/os-security-groups" - - security_group = self.find( - url, - attr='name', - value=security_group, - )['id'] - if security_group is not None: - return self.delete(f'/{url}/{security_group}') - - return None - - def security_group_find( - self, - security_group=None, - ): - """Return a security group given name or ID - - https://docs.openstack.org/api-ref/compute/#show-security-group-details - - :param string security_group: - Security group name or ID - :returns: A dict of the security group attributes - """ - - url = "/os-security-groups" - - return self.find( - url, - attr='name', - value=security_group, - ) - - def security_group_list( - self, - limit=None, - marker=None, - search_opts=None, - ): - """Get security groups - - https://docs.openstack.org/api-ref/compute/#list-security-groups - - :param integer limit: - query return count limit - :param string marker: - query marker - :param search_opts: - (undocumented) Search filter dict - all_tenants: True|False - return all projects - :returns: - list of security groups names - """ - - params = {} - if search_opts is not None: - params = {k: v for (k, v) in search_opts.items() if v} - if limit: - params['limit'] = limit - if marker: - params['offset'] = marker - - url = "/os-security-groups" - return self.list(url, **params)["security_groups"] - - def security_group_set( - self, - security_group=None, - # name=None, - # description=None, - **params, - ): - """Update a security group - - https://docs.openstack.org/api-ref/compute/#update-security-group - - :param string security_group: - Security group name or ID - - TODO(dtroyer): Create an update method in osc-lib - """ - - # Short-circuit no-op - if params is None: - return None - - url = "/os-security-groups" - - security_group = self.find( - url, - attr='name', - value=security_group, - ) - if security_group is not None: - for k, v in params.items(): - # Only set a value if it is already present - if k in security_group: - security_group[k] = v - return self._request( - "PUT", - "/{}/{}".format(url, security_group['id']), - json={'security_group': security_group}, - ).json()['security_group'] - return None - - # Security Group Rules - - def security_group_rule_create( - self, - security_group_id=None, - ip_protocol=None, - from_port=None, - to_port=None, - remote_ip=None, - remote_group=None, - ): - """Create a new security group rule - - https://docs.openstack.org/api-ref/compute/#create-security-group-rule - - :param string security_group_id: - Security group ID - :param ip_protocol: - IP protocol, 'tcp', 'udp' or 'icmp' - :param from_port: - Source port - :param to_port: - Destination port - :param remote_ip: - Source IP address in CIDR notation - :param remote_group: - Remote security group - """ - - url = "/os-security-group-rules" - - if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']: - raise InvalidValue( - "%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol - ) - - params = { - 'parent_group_id': security_group_id, - 'ip_protocol': ip_protocol, - 'from_port': self._check_integer(from_port), - 'to_port': self._check_integer(to_port), - 'cidr': remote_ip, - 'group_id': remote_group, - } - - return self.create( - url, - json={'security_group_rule': params}, - )['security_group_rule'] - - def security_group_rule_delete( - self, - security_group_rule_id=None, - ): - """Delete a security group rule - - https://docs.openstack.org/api-ref/compute/#delete-security-group-rule - - :param string security_group_rule_id: - Security group rule ID - """ - - url = "/os-security-group-rules" - if security_group_rule_id is not None: - return self.delete(f'/{url}/{security_group_rule_id}') - - return None + :param compute_client: A compute client + :param bool all_projects: If true, list from all projects + :returns: A list of security group objects + """ + url = '/os-security-groups' + if all_projects is not None: + url += f'?all_tenants={all_projects}' + response = compute_client.get(url, microversion='2.1') + sdk_exceptions.raise_from_response(response) + return response.json()['security_groups'] def find_security_group(compute_client, name_or_id): @@ -623,6 +100,145 @@ def find_security_group(compute_client, name_or_id): return found +def update_security_group( + compute_client, security_group_id, name=None, description=None +): + """Update an existing security group + + https://docs.openstack.org/api-ref/compute/#update-security-group + + :param compute_client: A compute client + :param str security_group_id: The ID of the security group to update + :param str name: Security group name + :param str description: Security group description + :returns: A security group object + """ + data = {} + if name: + data['name'] = name + if description: + data['description'] = description + response = compute_client.put( + f'/os-security-groups/{security_group_id}', + data=data, + microversion='2.1', + ) + sdk_exceptions.raise_from_response(response) + return response.json()['security_group'] + + +def delete_security_group(compute_client, security_group_id=None): + """Delete a security group + + https://docs.openstack.org/api-ref/compute/#delete-security-group + + :param compute_client: A compute client + :param str security_group_id: Security group ID + :returns: None + """ + response = compute_client.delete( + f'/os-security-groups/{security_group_id}', microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + + +# security group rules + + +def create_security_group_rule( + compute_client, + security_group_id=None, + ip_protocol=None, + from_port=None, + to_port=None, + remote_ip=None, + remote_group=None, +): + """Create a new security group rule + + https://docs.openstack.org/api-ref/compute/#create-security-group-rule + + :param compute_client: A compute client + :param str security_group_id: Security group ID + :param str ip_protocol: IP protocol, 'tcp', 'udp' or 'icmp' + :param int from_port: Source port + :param int to_port: Destination port + :param str remote_ip: Source IP address in CIDR notation + :param str remote_group: Remote security group + :returns: A security group object + """ + data = { + 'parent_group_id': security_group_id, + 'ip_protocol': ip_protocol, + 'from_port': from_port, + 'to_port': to_port, + 'cidr': remote_ip, + 'group_id': remote_group, + } + response = compute_client.post( + '/os-security-group-rules', data=data, microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + return response.json()['security_group_rule'] + + +def delete_security_group_rule(compute_client, security_group_rule_id=None): + """Delete a security group rule + + https://docs.openstack.org/api-ref/compute/#delete-security-group-rule + + :param compute_client: A compute client + :param str security_group_rule_id: Security group rule ID + :returns: None + """ + response = compute_client.delete( + f'/os-security-group-rules/{security_group_rule_id}', + microversion='2.1', + ) + sdk_exceptions.raise_from_response(response) + + +# networks + + +def create_network(compute_client, name, subnet, share_subnet=None): + """Create a new network + + https://docs.openstack.org/api-ref/compute/#create-network + + :param compute_client: A compute client + :param str name: Network label + :param int subnet: Subnet for IPv4 fixed addresses in CIDR notation + :param bool share_subnet: Shared subnet between projects + :returns: A network object + """ + data = { + 'label': name, + 'cidr': subnet, + } + if share_subnet is not None: + data['share_address'] = share_subnet + + response = compute_client.post( + '/os-networks', data=data, microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + return response.json()['network'] + + +def list_networks(compute_client): + """Get all networks + + https://docs.openstack.org/api-ref/compute/#list-networks + + :param compute_client: A compute client + :returns: A list of network objects + """ + response = compute_client.get('/os-networks', microversion='2.1') + sdk_exceptions.raise_from_response(response) + return response.json()['networks'] + + def find_network(compute_client, name_or_id): """Find the ID for a given network name or ID @@ -658,3 +274,94 @@ def find_network(compute_client, name_or_id): raise exceptions.NotFound(f'{name_or_id} not found') return found + + +def delete_network(compute_client, network_id): + """Delete a network + + https://docs.openstack.org/api-ref/compute/#delete-network + + :param compute_client: A compute client + :param string network_id: The network ID + :returns: None + """ + response = compute_client.delete( + f'/os-networks/{network_id}', microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + + +# floating ips + + +def create_floating_ip(compute_client, network): + """Create a new floating ip + + https://docs.openstack.org/api-ref/compute/#create-allocate-floating-ip-address + + :param network: Name of floating IP pool + :returns: A floating IP object + """ + response = compute_client.post( + '/os-floating-ips', data={'pool': network}, microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + return response.json()['floating_ip'] + + +def list_floating_ips(compute_client): + """Get all floating IPs + + https://docs.openstack.org/api-ref/compute/#list-floating-ip-addresses + + :returns: A list of floating IP objects + """ + response = compute_client.get('/os-floating-ips', microversion='2.1') + sdk_exceptions.raise_from_response(response) + return response.json()['floating_ips'] + + +def get_floating_ip(compute_client, floating_ip_id): + """Get a floating IP + + https://docs.openstack.org/api-ref/compute/#show-floating-ip-address-details + + :param string floating_ip_id: The floating IP address + :returns: A floating IP object + """ + response = compute_client.get( + f'/os-floating-ips/{floating_ip_id}', microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + return response.json()['floating_ip'] + + +def delete_floating_ip(compute_client, floating_ip_id): + """Delete a floating IP + + https://docs.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address + + :param string floating_ip_id: The floating IP address + :returns: None + """ + response = compute_client.delete( + f'/os-floating-ips/{floating_ip_id}', microversion='2.1' + ) + sdk_exceptions.raise_from_response(response) + + +# floating ip pools + + +def list_floating_ip_pools(compute_client): + """Get all floating IP pools + + https://docs.openstack.org/api-ref/compute/#list-floating-ip-pools + + :param compute_client: A compute client + :returns: A list of floating IP pool objects + """ + response = compute_client.get('/os-floating-ip-pools', microversion='2.1') + sdk_exceptions.raise_from_response(response) + + return response.json()['floating_ip_pools'] diff --git a/openstackclient/compute/client.py b/openstackclient/compute/client.py index e833d53b8c..157e0dddc4 100644 --- a/openstackclient/compute/client.py +++ b/openstackclient/compute/client.py @@ -31,7 +31,6 @@ API_VERSIONS = { "2.1": "novaclient.client", } -COMPUTE_API_TYPE = 'compute' COMPUTE_API_VERSIONS = { '2': 'openstackclient.api.compute_v2.APIv2', } @@ -63,15 +62,6 @@ def make_client(instance): # fallback to use the max version of novaclient side. version = novaclient.API_MAX_VERSION - LOG.debug('Instantiating compute client for %s', version) - - compute_api = utils.get_client_class( - API_NAME, - version.ver_major, - COMPUTE_API_VERSIONS, - ) - LOG.debug('Instantiating compute api: %s', compute_api) - # Set client http_log_debug to True if verbosity level is high enough http_log_debug = utils.get_effective_log_level() <= logging.DEBUG @@ -94,16 +84,6 @@ def make_client(instance): **kwargs ) - client.api = compute_api( - session=instance.session, - service_type=COMPUTE_API_TYPE, - endpoint=instance.get_endpoint_for_service_type( - COMPUTE_API_TYPE, - region_name=instance.region_name, - interface=instance.interface, - ), - ) - return client diff --git a/openstackclient/compute/v2/server.py b/openstackclient/compute/v2/server.py index 22ab4f11c6..78ab16d72b 100644 --- a/openstackclient/compute/v2/server.py +++ b/openstackclient/compute/v2/server.py @@ -552,8 +552,9 @@ class AddFloatingIP(network_common.NetworkAndComputeCommand): raise error def take_action_compute(self, client, parsed_args): - client.api.floating_ip_add( - parsed_args.server, + server = client.find_server(parsed_args.server, ignore_missing=False) + client.add_floating_ip_to_server( + server, parsed_args.ip_address, fixed_address=parsed_args.fixed_ip_address, ) @@ -3921,10 +3922,8 @@ class RemoveFloatingIP(network_common.NetworkAndComputeCommand): client.update_ip(obj, **attrs) def take_action_compute(self, client, parsed_args): - client.api.floating_ip_remove( - parsed_args.server, - parsed_args.ip_address, - ) + server = client.find_server(parsed_args.server, ignore_missing=False) + client.remove_floating_ip_from_server(server, parsed_args.ip_address) class RemovePort(command.Command): diff --git a/openstackclient/network/common.py b/openstackclient/network/common.py index 0dc291c926..4c01f90ef4 100644 --- a/openstackclient/network/common.py +++ b/openstackclient/network/common.py @@ -161,7 +161,7 @@ class NetDetectionMixin(metaclass=abc.ABCMeta): ) elif self.is_nova_network: return self.take_action_compute( - self.app.client_manager.compute, parsed_args + self.app.client_manager.sdk_connection.compute, parsed_args ) def take_action_network(self, client, parsed_args): @@ -210,7 +210,8 @@ class NetworkAndComputeDelete(NetworkAndComputeCommand, metaclass=abc.ABCMeta): ) else: self.take_action_compute( - self.app.client_manager.compute, parsed_args + self.app.client_manager.sdk_connection.compute, + parsed_args, ) except Exception as e: msg = _( @@ -267,7 +268,7 @@ class NetworkAndComputeShowOne( ) else: return self.take_action_compute( - self.app.client_manager.compute, parsed_args + self.app.client_manager.sdk_connection.compute, parsed_args ) except openstack.exceptions.HttpException as exc: msg = _("Error while executing command: %s") % exc.message diff --git a/openstackclient/network/v2/floating_ip.py b/openstackclient/network/v2/floating_ip.py index fef7b7d274..301ae716df 100644 --- a/openstackclient/network/v2/floating_ip.py +++ b/openstackclient/network/v2/floating_ip.py @@ -9,18 +9,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# """IP Floating action implementations""" from osc_lib import utils from osc_lib.utils import tags as _tag +from openstackclient.api import compute_v2 from openstackclient.i18n import _ from openstackclient.identity import common as identity_common from openstackclient.network import common - _formatters = { 'port_details': utils.format_dict, } @@ -200,7 +199,7 @@ class CreateFloatingIP( return (display_columns, data) def take_action_compute(self, client, parsed_args): - obj = client.api.floating_ip_create(parsed_args.network) + obj = compute_v2.create_floating_ip(client, parsed_args.network) columns = _get_columns(obj) data = utils.get_dict_properties(obj, columns) return (columns, data) @@ -230,7 +229,7 @@ class DeleteFloatingIP(common.NetworkAndComputeDelete): client.delete_ip(obj) def take_action_compute(self, client, parsed_args): - client.api.floating_ip_delete(self.r) + compute_v2.delete_floating_ip(client, self.r) class ListFloatingIP(common.NetworkAndComputeLister): @@ -421,8 +420,7 @@ class ListFloatingIP(common.NetworkAndComputeLister): 'Pool', ) - data = client.api.floating_ip_list() - + objs = compute_v2.list_floating_ips(client) return ( headers, ( @@ -431,7 +429,7 @@ class ListFloatingIP(common.NetworkAndComputeLister): columns, formatters={}, ) - for s in data + for s in objs ), ) @@ -538,7 +536,7 @@ class ShowFloatingIP(common.NetworkAndComputeShowOne): return (display_columns, data) def take_action_compute(self, client, parsed_args): - obj = client.api.floating_ip_find(parsed_args.floating_ip) + obj = compute_v2.get_floating_ip(client, parsed_args.floating_ip) columns = _get_columns(obj) data = utils.get_dict_properties(obj, columns) return (columns, data) diff --git a/openstackclient/network/v2/floating_ip_pool.py b/openstackclient/network/v2/floating_ip_pool.py index 43d7ac8585..2030eb8ad6 100644 --- a/openstackclient/network/v2/floating_ip_pool.py +++ b/openstackclient/network/v2/floating_ip_pool.py @@ -9,14 +9,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# """Floating IP Pool action implementations""" - from osc_lib import exceptions -from osc_lib import utils +from openstackclient.api import compute_v2 from openstackclient.i18n import _ from openstackclient.network import common @@ -33,15 +31,8 @@ class ListFloatingIPPool(common.NetworkAndComputeLister): def take_action_compute(self, client, parsed_args): columns = ('Name',) - data = client.api.floating_ip_pool_list() + data = [ + (x['name'],) for x in compute_v2.list_floating_ip_pools(client) + ] - return ( - columns, - ( - utils.get_dict_properties( - s, - columns, - ) - for s in data - ), - ) + return (columns, data) diff --git a/openstackclient/network/v2/network.py b/openstackclient/network/v2/network.py index 86883b35fa..de1910ea7c 100644 --- a/openstackclient/network/v2/network.py +++ b/openstackclient/network/v2/network.py @@ -9,7 +9,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# """Network action implementations""" @@ -18,6 +17,7 @@ from osc_lib.cli import format_columns from osc_lib import utils from osc_lib.utils import tags as _tag +from openstackclient.api import compute_v2 from openstackclient.i18n import _ from openstackclient.identity import common as identity_common from openstackclient.network import common @@ -388,7 +388,7 @@ class CreateNetwork( def take_action_compute(self, client, parsed_args): attrs = _get_attrs_compute(self.app.client_manager, parsed_args) - obj = client.api.network_create(**attrs) + obj = compute_v2.create_network(client, **attrs) display_columns, columns = _get_columns_compute(obj) data = utils.get_dict_properties(obj, columns) return (display_columns, data) @@ -416,7 +416,8 @@ class DeleteNetwork(common.NetworkAndComputeDelete): client.delete_network(obj) def take_action_compute(self, client, parsed_args): - client.api.network_delete(self.r) + network = compute_v2.find_network(client, self.r) + compute_v2.delete_network(client, network['id']) # TODO(sindhu): Use the SDK resource mapped attribute names once the @@ -673,7 +674,7 @@ class ListNetwork(common.NetworkAndComputeLister): 'Subnet', ) - data = client.api.network_list() + data = compute_v2.list_networks(client) return ( column_headers, @@ -828,7 +829,7 @@ class ShowNetwork(common.NetworkAndComputeShowOne): return (display_columns, data) def take_action_compute(self, client, parsed_args): - obj = client.api.network_find(parsed_args.network) + obj = compute_v2.find_network(client, parsed_args.network) display_columns, columns = _get_columns_compute(obj) data = utils.get_dict_properties(obj, columns) return (display_columns, data) diff --git a/openstackclient/network/v2/security_group.py b/openstackclient/network/v2/security_group.py index 90b09cae84..556eff2c0d 100644 --- a/openstackclient/network/v2/security_group.py +++ b/openstackclient/network/v2/security_group.py @@ -20,6 +20,7 @@ from osc_lib.command import command from osc_lib import utils from osc_lib.utils import tags as _tag +from openstackclient.api import compute_v2 from openstackclient.i18n import _ from openstackclient.identity import common as identity_common from openstackclient.network import common @@ -180,7 +181,8 @@ class CreateSecurityGroup( def take_action_compute(self, client, parsed_args): description = self._get_description(parsed_args) - obj = client.api.security_group_create( + obj = compute_v2.create_security_group( + client, parsed_args.name, description, ) @@ -212,7 +214,8 @@ class DeleteSecurityGroup(common.NetworkAndComputeDelete): client.delete_security_group(obj) def take_action_compute(self, client, parsed_args): - client.api.security_group_delete(self.r) + security_group = compute_v2.find_security_group(client, self.r) + compute_v2.delete_security_group(client, security_group['id']) # TODO(rauta): Use the SDK resource mapped attribute names once @@ -291,10 +294,10 @@ class ListSecurityGroup(common.NetworkAndComputeLister): ) def take_action_compute(self, client, parsed_args): - search = {'all_tenants': parsed_args.all_projects} - data = client.api.security_group_list( + data = compute_v2.list_security_groups( # TODO(dtroyer): add limit, marker - search_opts=search, + client, + all_projects=parsed_args.all_projects, ) columns = ( @@ -383,20 +386,21 @@ class SetSecurityGroup( _tag.update_tags_for_set(client, obj, parsed_args) def take_action_compute(self, client, parsed_args): - data = client.api.security_group_find(parsed_args.group) + security_group = compute_v2.find_security_group( + client, parsed_args.group + ) + params = {} if parsed_args.name is not None: - data['name'] = parsed_args.name + params['name'] = parsed_args.name if parsed_args.description is not None: - data['description'] = parsed_args.description + params['description'] = parsed_args.description # NOTE(rtheis): Previous behavior did not raise a CommandError # if there were no updates. Maintain this behavior and issue # the update. - client.api.security_group_set( - data, - data['name'], - data['description'], + compute_v2.update_security_group( + client, security_group['id'], **params ) @@ -422,7 +426,7 @@ class ShowSecurityGroup(common.NetworkAndComputeShowOne): return (display_columns, data) def take_action_compute(self, client, parsed_args): - obj = client.api.security_group_find(parsed_args.group) + obj = compute_v2.find_security_group(client, parsed_args.group) display_columns, property_columns = _get_columns(obj) data = utils.get_dict_properties( obj, property_columns, formatters=_formatters_compute diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py index 5e0de120b9..80e5ff0859 100644 --- a/openstackclient/network/v2/security_group_rule.py +++ b/openstackclient/network/v2/security_group_rule.py @@ -20,6 +20,7 @@ from osc_lib.cli import parseractions from osc_lib import exceptions from osc_lib import utils +from openstackclient.api import compute_v2 from openstackclient.i18n import _ from openstackclient.identity import common as identity_common from openstackclient.network import common @@ -91,7 +92,7 @@ class CreateSecurityGroupRule( "ending port range: 137:139. Required for IP protocols TCP " "and UDP. Ignored for ICMP IP protocols." ), - **dst_port_default + **dst_port_default, ) # NOTE(rtheis): Support either protocol option name for now. @@ -125,7 +126,7 @@ class CreateSecurityGroupRule( metavar='', type=network_utils.convert_to_lowercase, help=protocol_help, - **proto_choices + **proto_choices, ) if not self.is_docs_build: protocol_group.add_argument( @@ -133,7 +134,7 @@ class CreateSecurityGroupRule( metavar='', type=network_utils.convert_to_lowercase, help=argparse.SUPPRESS, - **proto_choices + **proto_choices, ) return parser @@ -292,7 +293,7 @@ class CreateSecurityGroupRule( return (display_columns, data) def take_action_compute(self, client, parsed_args): - group = client.api.security_group_find(parsed_args.group) + group = compute_v2.find_security_group(client, parsed_args.group) protocol = network_utils.get_protocol( parsed_args, default_protocol='tcp' ) @@ -303,15 +304,16 @@ class CreateSecurityGroupRule( remote_ip = None if parsed_args.remote_group is not None: - parsed_args.remote_group = client.api.security_group_find( - parsed_args.remote_group, + parsed_args.remote_group = compute_v2.find_security_group( + client, parsed_args.remote_group )['id'] if parsed_args.remote_ip is not None: remote_ip = parsed_args.remote_ip else: remote_ip = '0.0.0.0/0' - obj = client.api.security_group_rule_create( + obj = compute_v2.create_security_group_rule( + client, security_group_id=group['id'], ip_protocol=protocol, from_port=from_port, @@ -343,7 +345,7 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): client.delete_security_group_rule(obj) def take_action_compute(self, client, parsed_args): - client.api.security_group_rule_delete(self.r) + compute_v2.delete_security_group_rule(client, self.r) class ListSecurityGroupRule(common.NetworkAndComputeLister): @@ -532,15 +534,16 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister): rules_to_list = [] if parsed_args.group is not None: - group = client.api.security_group_find( - parsed_args.group, + security_group = compute_v2.find_security_group( + client, parsed_args.group ) - rules_to_list = group['rules'] + rules_to_list = security_group['rules'] else: columns = columns + ('parent_group_id',) - search = {'all_tenants': parsed_args.all_projects} - for group in client.api.security_group_list(search_opts=search): - rules_to_list.extend(group['rules']) + for security_group in compute_v2.list_security_groups( + client, all_projects=parsed_args.all_projects + ): + rules_to_list.extend(security_group['rules']) # NOTE(rtheis): Turn the raw rules into resources. rules = [] @@ -596,7 +599,7 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne): # the requested rule. obj = None security_group_rules = [] - for security_group in client.api.security_group_list(): + for security_group in compute_v2.list_security_groups(client): security_group_rules.extend(security_group['rules']) for security_group_rule in security_group_rules: if parsed_args.rule == str(security_group_rule.get('id')): diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py index fd2c521c7a..01ebf7b929 100644 --- a/openstackclient/tests/unit/api/test_compute_v2.py +++ b/openstackclient/tests/unit/api/test_compute_v2.py @@ -17,652 +17,71 @@ import http from unittest import mock import uuid -from keystoneauth1 import session from openstack.compute.v2 import _proxy from osc_lib import exceptions as osc_lib_exceptions -from requests_mock.contrib import fixture from openstackclient.api import compute_v2 as compute from openstackclient.tests.unit import fakes from openstackclient.tests.unit import utils -FAKE_PROJECT = 'xyzpdq' -FAKE_URL = 'http://gopher.com/v2' - - -class TestComputeAPIv2(utils.TestCase): - def setUp(self): - super().setUp() - sess = session.Session() - self.api = compute.APIv2(session=sess, endpoint=FAKE_URL) - self.requests_mock = self.useFixture(fixture.Fixture()) - - -class TestFloatingIP(TestComputeAPIv2): - FAKE_FLOATING_IP_RESP = { - 'id': 1, - 'ip': '203.0.113.11', # TEST-NET-3 - 'fixed_ip': '198.51.100.11', # TEST-NET-2 - 'pool': 'nova', - 'instance_id': None, - } - FAKE_FLOATING_IP_RESP_2 = { - 'id': 2, - 'ip': '203.0.113.12', # TEST-NET-3 - 'fixed_ip': '198.51.100.12', # TEST-NET-2 - 'pool': 'nova', - 'instance_id': None, - } - LIST_FLOATING_IP_RESP = [ - FAKE_FLOATING_IP_RESP, - FAKE_FLOATING_IP_RESP_2, - ] - - FAKE_SERVER_RESP_1 = { - 'id': 1, - 'name': 'server1', - } - - def test_floating_ip_add_id(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/servers/1/action', - json={'server': {}}, - status_code=200, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/servers/1', - json={'server': self.FAKE_SERVER_RESP_1}, - status_code=200, - ) - ret = self.api.floating_ip_add('1', '1.0.1.0') - self.assertEqual(200, ret.status_code) - - def test_floating_ip_add_name(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/servers/1/action', - json={'server': {}}, - status_code=200, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/servers/server1', - json={'server': self.FAKE_SERVER_RESP_1}, - status_code=200, - ) - ret = self.api.floating_ip_add('server1', '1.0.1.0') - self.assertEqual(200, ret.status_code) - - def test_floating_ip_create(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-floating-ips', - json={'floating_ip': self.FAKE_FLOATING_IP_RESP}, - status_code=200, - ) - ret = self.api.floating_ip_create('nova') - self.assertEqual(self.FAKE_FLOATING_IP_RESP, ret) - - def test_floating_ip_create_not_found(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-floating-ips', - status_code=404, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.floating_ip_create, - 'not-nova', - ) - - def test_floating_ip_delete(self): - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-floating-ips/1', - status_code=202, - ) - ret = self.api.floating_ip_delete('1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - def test_floating_ip_delete_none(self): - ret = self.api.floating_ip_delete() - self.assertIsNone(ret) - - def test_floating_ip_find_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips/1', - json={'floating_ip': self.FAKE_FLOATING_IP_RESP}, - status_code=200, - ) - ret = self.api.floating_ip_find('1') - self.assertEqual(self.FAKE_FLOATING_IP_RESP, ret) - - def test_floating_ip_find_ip(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips/' + self.FAKE_FLOATING_IP_RESP['ip'], - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips', - json={'floating_ips': self.LIST_FLOATING_IP_RESP}, - status_code=200, - ) - ret = self.api.floating_ip_find(self.FAKE_FLOATING_IP_RESP['ip']) - self.assertEqual(self.FAKE_FLOATING_IP_RESP, ret) - - def test_floating_ip_find_not_found(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips/1.2.3.4', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips', - json={'floating_ips': self.LIST_FLOATING_IP_RESP}, - status_code=200, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.floating_ip_find, - '1.2.3.4', - ) - - def test_floating_ip_list(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ips', - json={'floating_ips': self.LIST_FLOATING_IP_RESP}, - status_code=200, - ) - ret = self.api.floating_ip_list() - self.assertEqual(self.LIST_FLOATING_IP_RESP, ret) - - def test_floating_ip_remove_id(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/servers/1/action', - status_code=200, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/servers/1', - json={'server': self.FAKE_SERVER_RESP_1}, - status_code=200, - ) - ret = self.api.floating_ip_remove('1', '1.0.1.0') - self.assertEqual(200, ret.status_code) - - def test_floating_ip_remove_name(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/servers/1/action', - status_code=200, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/servers/server1', - json={'server': self.FAKE_SERVER_RESP_1}, - status_code=200, - ) - ret = self.api.floating_ip_remove('server1', '1.0.1.0') - self.assertEqual(200, ret.status_code) - - -class TestFloatingIPPool(TestComputeAPIv2): - LIST_FLOATING_IP_POOL_RESP = [ - {"name": "tide"}, - {"name": "press"}, - ] - - def test_floating_ip_pool_list(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-floating-ip-pools', - json={'floating_ip_pools': self.LIST_FLOATING_IP_POOL_RESP}, - status_code=200, - ) - ret = self.api.floating_ip_pool_list() - self.assertEqual(self.LIST_FLOATING_IP_POOL_RESP, ret) - - -class TestNetwork(TestComputeAPIv2): - FAKE_NETWORK_RESP = { - 'id': '1', - 'label': 'label1', - 'cidr': '1.2.3.0/24', - } - - FAKE_NETWORK_RESP_2 = { - 'id': '2', - 'label': 'label2', - 'cidr': '4.5.6.0/24', - } - - LIST_NETWORK_RESP = [ - FAKE_NETWORK_RESP, - FAKE_NETWORK_RESP_2, - ] - - def test_network_create_default(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-networks', - json={'network': self.FAKE_NETWORK_RESP}, - status_code=200, - ) - ret = self.api.network_create('label1') - self.assertEqual(self.FAKE_NETWORK_RESP, ret) - - def test_network_create_options(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-networks', - json={'network': self.FAKE_NETWORK_RESP}, - status_code=200, - ) - ret = self.api.network_create( - name='label1', - subnet='1.2.3.0/24', - ) - self.assertEqual(self.FAKE_NETWORK_RESP, ret) - - def test_network_delete_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/1', - json={'network': self.FAKE_NETWORK_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-networks/1', - status_code=202, - ) - ret = self.api.network_delete('1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - def test_network_delete_name(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/label1', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks', - json={'networks': self.LIST_NETWORK_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-networks/1', - status_code=202, - ) - ret = self.api.network_delete('label1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - def test_network_delete_not_found(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/label3', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks', - json={'networks': self.LIST_NETWORK_RESP}, - status_code=200, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.network_delete, - 'label3', - ) - - def test_network_find_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/1', - json={'network': self.FAKE_NETWORK_RESP}, - status_code=200, - ) - ret = self.api.network_find('1') - self.assertEqual(self.FAKE_NETWORK_RESP, ret) - - def test_network_find_name(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/label2', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks', - json={'networks': self.LIST_NETWORK_RESP}, - status_code=200, - ) - ret = self.api.network_find('label2') - self.assertEqual(self.FAKE_NETWORK_RESP_2, ret) - - def test_network_find_not_found(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks/label3', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks', - json={'networks': self.LIST_NETWORK_RESP}, - status_code=200, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.network_find, - 'label3', - ) - - def test_network_list_no_options(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-networks', - json={'networks': self.LIST_NETWORK_RESP}, - status_code=200, - ) - ret = self.api.network_list() - self.assertEqual(self.LIST_NETWORK_RESP, ret) - - -class TestSecurityGroup(TestComputeAPIv2): - FAKE_SECURITY_GROUP_RESP = { - 'id': '1', - 'name': 'sg1', - 'description': 'test security group', - 'tenant_id': '0123456789', - 'rules': [], - } - FAKE_SECURITY_GROUP_RESP_2 = { - 'id': '2', - 'name': 'sg2', - 'description': 'another test security group', - 'tenant_id': '0123456789', - 'rules': [], - } - LIST_SECURITY_GROUP_RESP = [ - FAKE_SECURITY_GROUP_RESP_2, - FAKE_SECURITY_GROUP_RESP, - ] - - def test_security_group_create_default(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-security-groups', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_create('sg1') - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) - - def test_security_group_create_options(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-security-groups', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_create( - name='sg1', - description='desc', - ) - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) - - def test_security_group_delete_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/1', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-security-groups/1', - status_code=202, - ) - ret = self.api.security_group_delete('1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - def test_security_group_delete_name(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/sg1', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-security-groups/1', - status_code=202, - ) - ret = self.api.security_group_delete('sg1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - def test_security_group_delete_not_found(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/sg3', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.security_group_delete, - 'sg3', - ) - - def test_security_group_find_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/1', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_find('1') - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) - - def test_security_group_find_name(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/sg2', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_find('sg2') - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) - - def test_security_group_find_not_found(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/sg3', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.assertRaises( - osc_lib_exceptions.NotFound, - self.api.security_group_find, - 'sg3', - ) - - def test_security_group_list_no_options(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_list() - self.assertEqual(self.LIST_SECURITY_GROUP_RESP, ret) - - def test_security_group_set_options_id(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/1', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'PUT', - FAKE_URL + '/os-security-groups/1', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, - status_code=200, - ) - ret = self.api.security_group_set( - security_group='1', description='desc2' - ) - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) - - def test_security_group_set_options_name(self): - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups/sg2', - status_code=404, - ) - self.requests_mock.register_uri( - 'GET', - FAKE_URL + '/os-security-groups', - json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, - status_code=200, - ) - self.requests_mock.register_uri( - 'PUT', - FAKE_URL + '/os-security-groups/2', - json={'security_group': self.FAKE_SECURITY_GROUP_RESP_2}, - status_code=200, - ) - ret = self.api.security_group_set( - security_group='sg2', description='desc2' - ) - self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) - - -class TestSecurityGroupRule(TestComputeAPIv2): - FAKE_SECURITY_GROUP_RULE_RESP = { - 'id': '1', - 'name': 'sgr1', - 'tenant_id': 'proj-1', - 'ip_protocol': 'TCP', - 'from_port': 1, - 'to_port': 22, - 'group': {}, - # 'ip_range': , - # 'cidr': , - # 'parent_group_id': , - } - - def test_security_group_create_no_options(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-security-group-rules', - json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, - status_code=200, - ) - ret = self.api.security_group_rule_create( - security_group_id='1', - ip_protocol='tcp', - ) - self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) - - def test_security_group_create_options(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-security-group-rules', - json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, - status_code=200, - ) - ret = self.api.security_group_rule_create( - security_group_id='1', - ip_protocol='tcp', - from_port=22, - to_port=22, - remote_ip='1.2.3.4/24', - ) - self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) - - def test_security_group_create_port_errors(self): - self.requests_mock.register_uri( - 'POST', - FAKE_URL + '/os-security-group-rules', - json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, - status_code=200, - ) - self.assertRaises( - compute.InvalidValue, - self.api.security_group_rule_create, - security_group_id='1', - ip_protocol='tcp', - from_port='', - to_port=22, - remote_ip='1.2.3.4/24', - ) - self.assertRaises( - compute.InvalidValue, - self.api.security_group_rule_create, - security_group_id='1', - ip_protocol='tcp', - from_port=0, - to_port=[], - remote_ip='1.2.3.4/24', - ) - - def test_security_group_rule_delete(self): - self.requests_mock.register_uri( - 'DELETE', - FAKE_URL + '/os-security-group-rules/1', - status_code=202, - ) - ret = self.api.security_group_rule_delete('1') - self.assertEqual(202, ret.status_code) - self.assertEqual("", ret.text) - - -class TestFindSecurityGroup(utils.TestCase): +class TestSecurityGroup(utils.TestCase): def setUp(self): super().setUp() self.compute_sdk_client = mock.Mock(_proxy.Proxy) + def test_create_security_group(self): + sg_name = 'name-' + uuid.uuid4().hex + sg_description = 'description-' + uuid.uuid4().hex + data = { + 'security_group': { + 'id': uuid.uuid4().hex, + 'name': sg_name, + 'description': sg_description, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + } + } + self.compute_sdk_client.post.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.create_security_group( + self.compute_sdk_client, sg_name, sg_description + ) + + self.compute_sdk_client.post.assert_called_once_with( + '/os-security-groups', + data={'name': sg_name, 'description': sg_description}, + microversion='2.1', + ) + self.assertEqual(data['security_group'], result) + + def test_list_security_groups(self): + data = { + 'security_groups': [ + { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'description': 'description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + } + ], + } + self.compute_sdk_client.get.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.list_security_groups(self.compute_sdk_client) + + self.compute_sdk_client.get.assert_called_once_with( + '/os-security-groups', microversion='2.1' + ) + self.assertEqual(data['security_groups'], result) + def test_find_security_group_by_id(self): sg_id = uuid.uuid4().hex sg_name = 'name-' + uuid.uuid4().hex @@ -764,14 +183,174 @@ class TestFindSecurityGroup(utils.TestCase): sg_name, ) + def test_update_security_group(self): + sg_id = uuid.uuid4().hex + sg_name = 'name-' + uuid.uuid4().hex + sg_description = 'description-' + uuid.uuid4().hex + data = { + 'security_group': { + 'id': sg_id, + 'name': sg_name, + 'description': sg_description, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + } + } + self.compute_sdk_client.put.return_value = fakes.FakeResponse( + data=data + ) -class TestFindNetwork(utils.TestCase): + result = compute.update_security_group( + self.compute_sdk_client, sg_id, sg_name, sg_description + ) + + self.compute_sdk_client.put.assert_called_once_with( + f'/os-security-groups/{sg_id}', + data={'name': sg_name, 'description': sg_description}, + microversion='2.1', + ) + self.assertEqual(data['security_group'], result) + + def test_delete_security_group(self): + sg_id = uuid.uuid4().hex + self.compute_sdk_client.delete.return_value = fakes.FakeResponse( + status_code=http.HTTPStatus.NO_CONTENT + ) + + result = compute.delete_security_group(self.compute_sdk_client, sg_id) + + self.compute_sdk_client.delete.assert_called_once_with( + f'/os-security-groups/{sg_id}', + microversion='2.1', + ) + self.assertIsNone(result) + + +class TestSecurityGroupRule(utils.TestCase): def setUp(self): super().setUp() self.compute_sdk_client = mock.Mock(_proxy.Proxy) + def test_create_security_group_rule(self): + sg_id = uuid.uuid4().hex + data = { + 'security_group_rule': { + 'parent_group_id': sg_id, + 'ip_protocol': 'tcp', + 'from_port': 22, + 'to_port': 22, + 'cidr': '10.0.0.0/24', + } + } + self.compute_sdk_client.post.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.create_security_group_rule( + self.compute_sdk_client, + security_group_id=sg_id, + ip_protocol='tcp', + from_port=22, + to_port=22, + remote_ip='10.0.0.0/24', + remote_group=None, + ) + + self.compute_sdk_client.post.assert_called_once_with( + '/os-security-group-rules', + data={ + 'parent_group_id': sg_id, + 'ip_protocol': 'tcp', + 'from_port': 22, + 'to_port': 22, + 'cidr': '10.0.0.0/24', + 'group_id': None, + }, + microversion='2.1', + ) + self.assertEqual(data['security_group_rule'], result) + + def test_delete_security_group_rule(self): + sg_id = uuid.uuid4().hex + self.compute_sdk_client.delete.return_value = fakes.FakeResponse( + status_code=http.HTTPStatus.NO_CONTENT + ) + + result = compute.delete_security_group_rule( + self.compute_sdk_client, sg_id + ) + + self.compute_sdk_client.delete.assert_called_once_with( + f'/os-security-group-rules/{sg_id}', + microversion='2.1', + ) + self.assertIsNone(result) + + +class TestNetwork(utils.TestCase): + + def setUp(self): + super().setUp() + + self.compute_sdk_client = mock.Mock(_proxy.Proxy) + + def test_create_network(self): + net_name = 'name-' + uuid.uuid4().hex + net_subnet = '10.0.0.0/24' + data = { + 'network': { + 'id': uuid.uuid4().hex, + 'label': net_name, + 'cidr': net_subnet, + 'share_address': True, + # other fields omitted for brevity + } + } + self.compute_sdk_client.post.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.create_network( + self.compute_sdk_client, + name=net_name, + subnet=net_subnet, + share_subnet=True, + ) + + self.compute_sdk_client.post.assert_called_once_with( + '/os-networks', + data={ + 'label': net_name, + 'cidr': net_subnet, + 'share_address': True, + }, + microversion='2.1', + ) + self.assertEqual(data['network'], result) + + def test_list_networks(self): + data = { + 'networks': [ + { + 'id': uuid.uuid4().hex, + 'label': f'name-{uuid.uuid4().hex}', + # other fields omitted for brevity + } + ], + } + self.compute_sdk_client.get.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.list_networks(self.compute_sdk_client) + + self.compute_sdk_client.get.assert_called_once_with( + '/os-networks', microversion='2.1' + ) + self.assertEqual(data['networks'], result) + def test_find_network_by_id(self): net_id = uuid.uuid4().hex net_name = 'name-' + uuid.uuid4().hex @@ -862,3 +441,133 @@ class TestFindNetwork(utils.TestCase): self.compute_sdk_client, net_name, ) + + def test_delete_network(self): + net_id = uuid.uuid4().hex + self.compute_sdk_client.delete.return_value = fakes.FakeResponse( + status_code=http.HTTPStatus.NO_CONTENT + ) + + result = compute.delete_network(self.compute_sdk_client, net_id) + + self.compute_sdk_client.delete.assert_called_once_with( + f'/os-networks/{net_id}', microversion='2.1' + ) + self.assertIsNone(result) + + +class TestFloatingIP(utils.TestCase): + + def setUp(self): + super().setUp() + + self.compute_sdk_client = mock.Mock(_proxy.Proxy) + + def test_create_floating_ip(self): + network = 'network-' + uuid.uuid4().hex + data = { + 'floating_ip': { + 'fixed_ip': None, + 'id': uuid.uuid4().hex, + 'instance_id': None, + 'ip': '172.24.4.17', + 'pool': network, + } + } + self.compute_sdk_client.post.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.create_floating_ip( + self.compute_sdk_client, network=network + ) + + self.compute_sdk_client.post.assert_called_once_with( + '/os-floating-ips', data={'pool': network}, microversion='2.1' + ) + self.assertEqual(data['floating_ip'], result) + + def test_list_floating_ips(self): + data = { + 'floating_ips': [ + { + 'fixed_ip': None, + 'id': uuid.uuid4().hex, + 'instance_id': None, + 'ip': '172.24.4.17', + 'pool': f'network-{uuid.uuid4().hex}', + } + ], + } + self.compute_sdk_client.get.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.list_floating_ips(self.compute_sdk_client) + + self.compute_sdk_client.get.assert_called_once_with( + '/os-floating-ips', microversion='2.1' + ) + self.assertEqual(data['floating_ips'], result) + + def test_get_floating_ip(self): + fip_id = uuid.uuid4().hex + data = { + 'floating_ip': { + 'fixed_ip': None, + 'id': fip_id, + 'instance_id': None, + 'ip': '172.24.4.17', + 'pool': f'network-{uuid.uuid4().hex}', + } + } + self.compute_sdk_client.get.side_effect = [ + fakes.FakeResponse(data=data), + ] + + result = compute.get_floating_ip(self.compute_sdk_client, fip_id) + + self.compute_sdk_client.get.assert_called_once_with( + f'/os-floating-ips/{fip_id}', microversion='2.1' + ) + self.assertEqual(data['floating_ip'], result) + + def test_delete_floating_ip(self): + fip_id = uuid.uuid4().hex + self.compute_sdk_client.delete.return_value = fakes.FakeResponse( + status_code=http.HTTPStatus.NO_CONTENT + ) + + result = compute.delete_floating_ip(self.compute_sdk_client, fip_id) + + self.compute_sdk_client.delete.assert_called_once_with( + f'/os-floating-ips/{fip_id}', microversion='2.1' + ) + self.assertIsNone(result) + + +class TestFloatingIPPool(utils.TestCase): + + def setUp(self): + super().setUp() + + self.compute_sdk_client = mock.Mock(_proxy.Proxy) + + def test_list_floating_ip_pools(self): + data = { + 'floating_ip_pools': [ + { + 'name': f'pool-{uuid.uuid4().hex}', + } + ], + } + self.compute_sdk_client.get.return_value = fakes.FakeResponse( + data=data + ) + + result = compute.list_floating_ip_pools(self.compute_sdk_client) + + self.compute_sdk_client.get.assert_called_once_with( + '/os-floating-ip-pools', microversion='2.1' + ) + self.assertEqual(data['floating_ip_pools'], result) diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py index 3e89d49057..5a19ad56ae 100644 --- a/openstackclient/tests/unit/compute/v2/fakes.py +++ b/openstackclient/tests/unit/compute/v2/fakes.py @@ -39,7 +39,6 @@ from openstack.compute.v2 import service as _service from openstack.compute.v2 import usage as _usage from openstack.compute.v2 import volume_attachment as _volume_attachment -from openstackclient.api import compute_v2 from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes from openstackclient.tests.unit.image.v2 import fakes as image_fakes @@ -154,11 +153,6 @@ class FakeClientMixin: ) self.compute_client = self.app.client_manager.compute - self.compute_client.api = compute_v2.APIv2( - session=self.app.client_manager.session, - endpoint=fakes.AUTH_URL, - ) - # TODO(stephenfin): Rename to 'compute_client' once all commands are # migrated to SDK self.app.client_manager.sdk_connection.compute = mock.Mock( diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py index 8902b65015..4b87dc85b5 100644 --- a/openstackclient/tests/unit/compute/v2/test_server.py +++ b/openstackclient/tests/unit/compute/v2/test_server.py @@ -392,57 +392,57 @@ class TestServerAddFixedIP(TestServer): ) -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_add') class TestServerAddFloatingIPCompute(compute_fakes.TestComputev2): def setUp(self): super().setUp() self.app.client_manager.network_endpoint_enabled = False + self.server = compute_fakes.create_one_sdk_server() + self.compute_sdk_client.find_server.return_value = self.server - # Get the command object to test self.cmd = server.AddFloatingIP(self.app, None) - def test_server_add_floating_ip_default(self, fip_mock): - _floating_ip = compute_fakes.create_one_floating_ip() + def test_server_add_floating_ip_default(self): arglist = [ - 'server1', - _floating_ip['ip'], + self.server.name, + '1.2.3.4', ] verifylist = [ - ('server', 'server1'), - ('ip_address', _floating_ip['ip']), + ('server', self.server.name), + ('ip_address', '1.2.3.4'), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with( - 'server1', - _floating_ip['ip'], - fixed_address=None, + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.name, ignore_missing=False + ) + self.compute_sdk_client.add_floating_ip_to_server.assert_called_once_with( + self.server, '1.2.3.4', fixed_address=None ) - def test_server_add_floating_ip_fixed(self, fip_mock): - _floating_ip = compute_fakes.create_one_floating_ip() + def test_server_add_floating_ip_fixed(self): arglist = [ '--fixed-ip-address', - _floating_ip['fixed_ip'], - 'server1', - _floating_ip['ip'], + '5.6.7.8', + self.server.name, + '1.2.3.4', ] verifylist = [ - ('fixed_ip_address', _floating_ip['fixed_ip']), - ('server', 'server1'), - ('ip_address', _floating_ip['ip']), + ('fixed_ip_address', '5.6.7.8'), + ('server', self.server.name), + ('ip_address', '1.2.3.4'), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with( - 'server1', - _floating_ip['ip'], - fixed_address=_floating_ip['fixed_ip'], + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.name, ignore_missing=False + ) + self.compute_sdk_client.add_floating_ip_to_server.assert_called_once_with( + self.server, '1.2.3.4', fixed_address='5.6.7.8' ) @@ -7267,34 +7267,34 @@ class TestServerRescue(compute_fakes.TestComputev2): self.assertIsNone(result) -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_remove') class TestServerRemoveFloatingIPCompute(compute_fakes.TestComputev2): def setUp(self): super().setUp() self.app.client_manager.network_endpoint_enabled = False + self.server = compute_fakes.create_one_sdk_server() + self.compute_sdk_client.find_server.return_value = self.server - # Get the command object to test self.cmd = server.RemoveFloatingIP(self.app, None) - def test_server_remove_floating_ip(self, fip_mock): - _floating_ip = compute_fakes.create_one_floating_ip() - + def test_server_remove_floating_ip(self): arglist = [ - 'server1', - _floating_ip['ip'], + self.server.name, + '1.2.3.4', ] verifylist = [ - ('server', 'server1'), - ('ip_address', _floating_ip['ip']), + ('server', self.server.name), + ('ip_address', '1.2.3.4'), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with( - 'server1', - _floating_ip['ip'], + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.name, ignore_missing=False + ) + self.compute_sdk_client.remove_floating_ip_from_server.assert_called_once_with( + self.server, '1.2.3.4' ) diff --git a/openstackclient/tests/unit/network/test_common.py b/openstackclient/tests/unit/network/test_common.py index c1ad9b28d2..a84697b2ca 100644 --- a/openstackclient/tests/unit/network/test_common.py +++ b/openstackclient/tests/unit/network/test_common.py @@ -132,8 +132,8 @@ class TestNetworkAndCompute(utils.TestCommand): return_value='take_action_network' ) - self.app.client_manager.compute = mock.Mock() - self.compute_client = self.app.client_manager.compute + self.app.client_manager.sdk_connection.compute = mock.Mock() + self.compute_client = self.app.client_manager.sdk_connection.compute self.compute_client.compute_action = mock.Mock( return_value='take_action_compute' ) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py index b7ca587517..d310f199af 100644 --- a/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py @@ -12,21 +12,17 @@ # from unittest import mock -from unittest.mock import call from osc_lib import exceptions +from openstackclient.api import compute_v2 from openstackclient.network.v2 import floating_ip as fip from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit import utils as tests_utils -# Tests for Nova network - - -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_create') +@mock.patch.object(compute_v2, 'create_floating_ip') class TestCreateFloatingIPCompute(compute_fakes.TestComputev2): - # The floating ip to be deleted. _floating_ip = compute_fakes.create_one_floating_ip() columns = ( @@ -50,9 +46,6 @@ class TestCreateFloatingIPCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # self.compute_client.floating_ips.create.return_value = self.floating_ip - - # Get the command object to test self.cmd = fip.CreateFloatingIP(self.app, None) def test_floating_ip_create_no_arg(self, fip_mock): @@ -79,14 +72,15 @@ class TestCreateFloatingIPCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with(self._floating_ip['pool']) + fip_mock.assert_called_once_with( + self.compute_sdk_client, self._floating_ip['pool'] + ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_delete') +@mock.patch.object(compute_v2, 'delete_floating_ip') class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2): - # The floating ips to be deleted. _floating_ips = compute_fakes.create_floating_ips(count=2) def setUp(self): @@ -94,7 +88,6 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = fip.DeleteFloatingIP(self.app, None) def test_floating_ip_delete(self, fip_mock): @@ -109,27 +102,34 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with(self._floating_ips[0]['id']) + fip_mock.assert_called_once_with( + self.compute_sdk_client, self._floating_ips[0]['id'] + ) self.assertIsNone(result) def test_floating_ip_delete_multi(self, fip_mock): fip_mock.return_value = mock.Mock(return_value=None) - arglist = [] - verifylist = [] - - for f in self._floating_ips: - arglist.append(f['id']) + arglist = [ + self._floating_ips[0]['id'], + self._floating_ips[1]['id'], + ] verifylist = [ ('floating_ip', arglist), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - calls = [] - for f in self._floating_ips: - calls.append(call(f['id'])) - fip_mock.assert_has_calls(calls) + fip_mock.assert_has_calls( + [ + mock.call( + self.compute_sdk_client, self._floating_ips[0]['id'] + ), + mock.call( + self.compute_sdk_client, self._floating_ips[1]['id'] + ), + ] + ) self.assertIsNone(result) def test_floating_ip_delete_multi_exception(self, fip_mock): @@ -156,13 +156,16 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2): except exceptions.CommandError as e: self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - fip_mock.assert_any_call(self._floating_ips[0]['id']) - fip_mock.assert_any_call('unexist_floating_ip') + fip_mock.assert_any_call( + self.compute_sdk_client, self._floating_ips[0]['id'] + ) + fip_mock.assert_any_call( + self.compute_sdk_client, 'unexist_floating_ip' + ) -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_list') +@mock.patch.object(compute_v2, 'list_floating_ips') class TestListFloatingIPCompute(compute_fakes.TestComputev2): - # The floating ips to be list up _floating_ips = compute_fakes.create_floating_ips(count=3) columns = ( @@ -190,7 +193,6 @@ class TestListFloatingIPCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = fip.ListFloatingIP(self.app, None) def test_floating_ip_list(self, fip_mock): @@ -201,14 +203,13 @@ class TestListFloatingIPCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with() + fip_mock.assert_called_once_with(self.compute_sdk_client) self.assertEqual(self.columns, columns) self.assertEqual(self.data, list(data)) -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_find') +@mock.patch.object(compute_v2, 'get_floating_ip') class TestShowFloatingIPCompute(compute_fakes.TestComputev2): - # The floating ip to display. _floating_ip = compute_fakes.create_one_floating_ip() columns = ( @@ -232,7 +233,6 @@ class TestShowFloatingIPCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = fip.ShowFloatingIP(self.app, None) def test_floating_ip_show(self, fip_mock): @@ -247,6 +247,8 @@ class TestShowFloatingIPCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - fip_mock.assert_called_once_with(self._floating_ip['id']) + fip_mock.assert_called_once_with( + self.compute_sdk_client, self._floating_ip['id'] + ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py index 29084c15b8..26fc3918e5 100644 --- a/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py @@ -13,14 +13,12 @@ from unittest import mock +from openstackclient.api import compute_v2 from openstackclient.network.v2 import floating_ip_pool from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes -# Tests for Compute network - - -@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_pool_list') +@mock.patch.object(compute_v2, 'list_floating_ip_pools') class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2): # The floating ip pools to list up _floating_ip_pools = compute_fakes.create_floating_ip_pools(count=3) @@ -36,7 +34,6 @@ class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None) def test_floating_ip_list(self, fipp_mock): @@ -47,6 +44,6 @@ class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - fipp_mock.assert_called_once_with() + fipp_mock.assert_called_once_with(self.compute_sdk_client) self.assertEqual(self.columns, columns) self.assertEqual(self.data, list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_network_compute.py b/openstackclient/tests/unit/network/v2/test_network_compute.py index 651bf72a58..045fa1f6b6 100644 --- a/openstackclient/tests/unit/network/v2/test_network_compute.py +++ b/openstackclient/tests/unit/network/v2/test_network_compute.py @@ -12,22 +12,17 @@ # from unittest import mock -from unittest.mock import call from osc_lib import exceptions +from openstackclient.api import compute_v2 from openstackclient.network.v2 import network from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit import utils as tests_utils -# Tests for Nova network -# - - -@mock.patch('openstackclient.api.compute_v2.APIv2.network_create') +@mock.patch.object(compute_v2, 'create_network') class TestCreateNetworkCompute(compute_fakes.TestComputev2): - # The network to create. _network = compute_fakes.create_one_network() columns = ( @@ -105,7 +100,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = network.CreateNetwork(self.app, None) def test_network_create_no_options(self, net_mock): @@ -113,7 +107,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2): arglist = [] verifylist = [] - # Missing required args should raise exception here self.assertRaises( tests_utils.ParserException, self.check_parser, @@ -131,7 +124,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2): ('name', self._network['label']), ] - # Missing required args should raise exception here self.assertRaises( tests_utils.ParserException, self.check_parser, @@ -156,35 +148,29 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) net_mock.assert_called_once_with( - **{ - 'subnet': self._network['cidr'], - 'name': self._network['label'], - } + self.compute_sdk_client, + subnet=self._network['cidr'], + name=self._network['label'], ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) -@mock.patch('openstackclient.api.compute_v2.APIv2.network_delete') +@mock.patch.object(compute_v2, 'delete_network') +@mock.patch.object(compute_v2, 'find_network') class TestDeleteNetworkCompute(compute_fakes.TestComputev2): def setUp(self): super().setUp() self.app.client_manager.network_endpoint_enabled = False - # The networks to delete self._networks = compute_fakes.create_networks(count=3) - # Return value of utils.find_resource() - self.compute_client.api.network_find = compute_fakes.get_networks( - networks=self._networks - ) - - # Get the command object to test self.cmd = network.DeleteNetwork(self.app, None) - def test_network_delete_one(self, net_mock): - net_mock.return_value = mock.Mock(return_value=None) + def test_network_delete_one(self, find_net_mock, delete_net_mock): + find_net_mock.side_effect = self._networks + delete_net_mock.return_value = mock.Mock(return_value=None) arglist = [ self._networks[0]['label'], ] @@ -195,35 +181,44 @@ class TestDeleteNetworkCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) - net_mock.assert_called_once_with( - self._networks[0]['label'], + delete_net_mock.assert_called_once_with( + self.compute_sdk_client, + self._networks[0]['id'], ) self.assertIsNone(result) - def test_network_delete_multi(self, net_mock): - net_mock.return_value = mock.Mock(return_value=None) - arglist = [] - for n in self._networks: - arglist.append(n['id']) + def test_network_delete_multi(self, find_net_mock, delete_net_mock): + find_net_mock.side_effect = self._networks + delete_net_mock.return_value = mock.Mock(return_value=None) + arglist = [ + self._networks[0]['id'], + self._networks[1]['id'], + ] verifylist = [ ('network', arglist), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - calls = [] - for n in self._networks: - calls.append(call(n['id'])) - net_mock.assert_has_calls(calls) + delete_net_mock.assert_has_calls( + [ + mock.call(self.compute_sdk_client, self._networks[0]['id']), + mock.call(self.compute_sdk_client, self._networks[1]['id']), + ] + ) self.assertIsNone(result) - def test_network_delete_multi_with_exception(self, net_mock): - net_mock.return_value = mock.Mock(return_value=None) - net_mock.side_effect = [ - mock.Mock(return_value=None), - exceptions.CommandError, + def test_network_delete_multi_with_exception( + self, find_net_mock, delete_net_mock + ): + find_net_mock.side_effect = [ + self._networks[0], + exceptions.NotFound('foo'), + self._networks[1], ] + delete_net_mock.return_value = mock.Mock(return_value=None) + arglist = [ self._networks[0]['id'], 'xxxx-yyyy-zzzz', @@ -234,20 +229,30 @@ class TestDeleteNetworkCompute(compute_fakes.TestComputev2): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('2 of 3 networks failed to delete.', str(e)) + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args, + ) + self.assertEqual('1 of 3 networks failed to delete.', str(exc)) - net_mock.assert_any_call(self._networks[0]['id']) - net_mock.assert_any_call(self._networks[1]['id']) - net_mock.assert_any_call('xxxx-yyyy-zzzz') + find_net_mock.assert_has_calls( + [ + mock.call(self.compute_sdk_client, self._networks[0]['id']), + mock.call(self.compute_sdk_client, 'xxxx-yyyy-zzzz'), + mock.call(self.compute_sdk_client, self._networks[1]['id']), + ] + ) + delete_net_mock.assert_has_calls( + [ + mock.call(self.compute_sdk_client, self._networks[0]['id']), + mock.call(self.compute_sdk_client, self._networks[1]['id']), + ] + ) -@mock.patch('openstackclient.api.compute_v2.APIv2.network_list') +@mock.patch.object(compute_v2, 'list_networks') class TestListNetworkCompute(compute_fakes.TestComputev2): - # The networks going to be listed up. _networks = compute_fakes.create_networks(count=3) columns = ( @@ -271,7 +276,6 @@ class TestListNetworkCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = network.ListNetwork(self.app, None) def test_network_list_no_options(self, net_mock): @@ -280,19 +284,15 @@ class TestListNetworkCompute(compute_fakes.TestComputev2): verifylist = [] parsed_args = self.check_parser(self.cmd, arglist, verifylist) - # In base command class Lister in cliff, abstract method take_action() - # returns a tuple containing the column names and an iterable - # containing the data to be listed. columns, data = self.cmd.take_action(parsed_args) - net_mock.assert_called_once_with() + net_mock.assert_called_once_with(self.compute_sdk_client) self.assertEqual(self.columns, columns) self.assertEqual(self.data, list(data)) -@mock.patch('openstackclient.api.compute_v2.APIv2.network_find') +@mock.patch.object(compute_v2, 'find_network') class TestShowNetworkCompute(compute_fakes.TestComputev2): - # The network to show. _network = compute_fakes.create_one_network() columns = ( @@ -370,7 +370,6 @@ class TestShowNetworkCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - # Get the command object to test self.cmd = network.ShowNetwork(self.app, None) def test_show_no_options(self, net_mock): @@ -398,6 +397,8 @@ class TestShowNetworkCompute(compute_fakes.TestComputev2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - net_mock.assert_called_once_with(self._network['label']) + net_mock.assert_called_once_with( + self.compute_sdk_client, self._network['label'] + ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py index 8068788109..243f830210 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_compute.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py @@ -12,17 +12,17 @@ # from unittest import mock -from unittest.mock import call from osc_lib import exceptions +from openstackclient.api import compute_v2 from openstackclient.network.v2 import security_group from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit import utils as tests_utils -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_create') +@mock.patch.object(compute_v2, 'create_security_group') class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2): project = identity_fakes.FakeProject.create_one_project() domain = identity_fakes.FakeDomain.create_one_domain() @@ -72,6 +72,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) sg_mock.assert_called_once_with( + self.compute_sdk_client, self._security_group['name'], self._security_group['name'], ) @@ -94,6 +95,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) sg_mock.assert_called_once_with( + self.compute_sdk_client, self._security_group['name'], self._security_group['description'], ) @@ -101,7 +103,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2): self.assertCountEqual(self.data, data) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_delete') +@mock.patch.object(compute_v2, 'delete_security_group') class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2): # The security groups to be deleted. _security_groups = compute_fakes.create_security_groups() @@ -111,8 +113,8 @@ class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - self.compute_client.api.security_group_find = ( - compute_fakes.get_security_groups(self._security_groups) + compute_v2.find_security_group = mock.Mock( + side_effect=self._security_groups ) # Get the command object to test @@ -131,59 +133,68 @@ class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) sg_mock.assert_called_once_with( + self.compute_sdk_client, self._security_groups[0]['id'], ) self.assertIsNone(result) def test_security_group_multi_delete(self, sg_mock): sg_mock.return_value = mock.Mock(return_value=None) - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s['id']) + arglist = [ + self._security_groups[0]['id'], + self._security_groups[1]['id'], + ] verifylist = [ ('group', arglist), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - calls = [] - for s in self._security_groups: - calls.append(call(s['id'])) - sg_mock.assert_has_calls(calls) + sg_mock.assert_has_calls( + [ + mock.call( + self.compute_sdk_client, self._security_groups[0]['id'] + ), + mock.call( + self.compute_sdk_client, self._security_groups[1]['id'] + ), + ] + ) self.assertIsNone(result) def test_security_group_multi_delete_with_exception(self, sg_mock): sg_mock.return_value = mock.Mock(return_value=None) - sg_mock.side_effect = [ - mock.Mock(return_value=None), - exceptions.CommandError, + compute_v2.find_security_group.side_effect = [ + self._security_groups[0], + exceptions.NotFound('foo'), ] arglist = [ self._security_groups[0]['id'], 'unexist_security_group', ] verifylist = [ - ( - 'group', - [self._security_groups[0]['id'], 'unexist_security_group'], - ), + ('group', arglist), ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args, + ) + self.assertEqual('1 of 2 groups failed to delete.', str(exc)) - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - sg_mock.assert_any_call(self._security_groups[0]['id']) - sg_mock.assert_any_call('unexist_security_group') + sg_mock.assert_has_calls( + [ + mock.call( + self.compute_sdk_client, self._security_groups[0]['id'] + ), + ] + ) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_list') +@mock.patch.object(compute_v2, 'list_security_groups') class TestListSecurityGroupCompute(compute_fakes.TestComputev2): # The security group to be listed. _security_groups = compute_fakes.create_security_groups(count=3) @@ -238,8 +249,9 @@ class TestListSecurityGroupCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - kwargs = {'search_opts': {'all_tenants': False}} - sg_mock.assert_called_once_with(**kwargs) + sg_mock.assert_called_once_with( + self.compute_sdk_client, all_projects=False + ) self.assertEqual(self.columns, columns) self.assertCountEqual(self.data, list(data)) @@ -255,13 +267,14 @@ class TestListSecurityGroupCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - kwargs = {'search_opts': {'all_tenants': True}} - sg_mock.assert_called_once_with(**kwargs) + sg_mock.assert_called_once_with( + self.compute_sdk_client, all_projects=True + ) self.assertEqual(self.columns_all_projects, columns) self.assertCountEqual(self.data_all_projects, list(data)) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_set') +@mock.patch.object(compute_v2, 'update_security_group') class TestSetSecurityGroupCompute(compute_fakes.TestComputev2): # The security group to be set. _security_group = compute_fakes.create_one_security_group() @@ -271,7 +284,7 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - self.compute_client.api.security_group_find = mock.Mock( + compute_v2.find_security_group = mock.Mock( return_value=self._security_group ) @@ -296,9 +309,7 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) sg_mock.assert_called_once_with( - self._security_group, - self._security_group['name'], - self._security_group['description'], + self.compute_sdk_client, self._security_group['id'] ) self.assertIsNone(result) @@ -323,12 +334,15 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) sg_mock.assert_called_once_with( - self._security_group, new_name, new_description + self.compute_sdk_client, + self._security_group['id'], + name=new_name, + description=new_description, ) self.assertIsNone(result) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_find') +@mock.patch.object(compute_v2, 'find_security_group') class TestShowSecurityGroupCompute(compute_fakes.TestComputev2): # The security group rule to be shown with the group. _security_group_rule = compute_fakes.create_one_security_group_rule() @@ -379,6 +393,8 @@ class TestShowSecurityGroupCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - sg_mock.assert_called_once_with(self._security_group['id']) + sg_mock.assert_called_once_with( + self.compute_sdk_client, self._security_group['id'] + ) self.assertEqual(self.columns, columns) self.assertCountEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py index cc780d54a7..fc3c0ddb19 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py @@ -9,13 +9,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# from unittest import mock -from unittest.mock import call from osc_lib import exceptions +from openstackclient.api import compute_v2 from openstackclient.network import utils as network_utils from openstackclient.network.v2 import security_group_rule from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes @@ -23,7 +22,7 @@ from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit import utils as tests_utils -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_rule_create') +@mock.patch.object(compute_v2, 'create_security_group_rule') class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): project = identity_fakes.FakeProject.create_one_project() domain = identity_fakes.FakeDomain.create_one_domain() @@ -51,7 +50,7 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - self.compute_client.api.security_group_find = mock.Mock( + compute_v2.find_security_group = mock.Mock( return_value=self._security_group, ) @@ -159,9 +158,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - # TODO(dtroyer): save this for the security group rule changes - # self.compute_client.api.security_group_rule_create.assert_called_once_with( sgr_mock.assert_called_once_with( + self.compute_sdk_client, security_group_id=self._security_group['id'], ip_protocol=self._security_group_rule['ip_protocol'], from_port=self._security_group_rule['from_port'], @@ -203,9 +201,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - # TODO(dtroyer): save this for the security group rule changes - # self.compute_client.api.security_group_rule_create.assert_called_once_with( sgr_mock.assert_called_once_with( + self.compute_sdk_client, security_group_id=self._security_group['id'], ip_protocol=self._security_group_rule['ip_protocol'], from_port=self._security_group_rule['from_port'], @@ -242,9 +239,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - # TODO(dtroyer): save this for the security group rule changes - # self.compute_client.api.security_group_rule_create.assert_called_once_with( sgr_mock.assert_called_once_with( + self.compute_sdk_client, security_group_id=self._security_group['id'], ip_protocol=self._security_group_rule['ip_protocol'], from_port=self._security_group_rule['from_port'], @@ -282,9 +278,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - # TODO(dtroyer): save this for the security group rule changes - # self.compute_client.api.security_group_rule_create.assert_called_once_with( sgr_mock.assert_called_once_with( + self.compute_sdk_client, security_group_id=self._security_group['id'], ip_protocol=self._security_group_rule['ip_protocol'], from_port=self._security_group_rule['from_port'], @@ -296,7 +291,7 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2): self.assertEqual(expected_data, data) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_rule_delete') +@mock.patch.object(compute_v2, 'delete_security_group_rule') class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2): # The security group rule to be deleted. _security_group_rules = compute_fakes.create_security_group_rules(count=2) @@ -320,26 +315,35 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) - sgr_mock.assert_called_once_with(self._security_group_rules[0]['id']) + sgr_mock.assert_called_once_with( + self.compute_sdk_client, self._security_group_rules[0]['id'] + ) self.assertIsNone(result) def test_security_group_rule_delete_multi(self, sgr_mock): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s['id']) + arglist = [ + self._security_group_rules[0]['id'], + self._security_group_rules[1]['id'], + ] verifylist = [ ('rule', arglist), ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - calls = [] - for s in self._security_group_rules: - calls.append(call(s['id'])) - sgr_mock.assert_has_calls(calls) + sgr_mock.assert_has_calls( + [ + mock.call( + self.compute_sdk_client, + self._security_group_rules[0]['id'], + ), + mock.call( + self.compute_sdk_client, + self._security_group_rules[1]['id'], + ), + ] + ) self.assertIsNone(result) def test_security_group_rule_delete_multi_with_exception(self, sgr_mock): @@ -348,12 +352,11 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2): 'unexist_rule', ] verifylist = [ - ('rule', [self._security_group_rules[0]['id'], 'unexist_rule']), + ('rule', arglist), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) - find_mock_result = [None, exceptions.CommandError] - sgr_mock.side_effect = find_mock_result + sgr_mock.side_effect = [None, exceptions.NotFound('foo')] try: self.cmd.take_action(parsed_args) @@ -361,8 +364,15 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2): except exceptions.CommandError as e: self.assertEqual('1 of 2 rules failed to delete.', str(e)) - sgr_mock.assert_any_call(self._security_group_rules[0]['id']) - sgr_mock.assert_any_call('unexist_rule') + sgr_mock.assert_has_calls( + [ + mock.call( + self.compute_sdk_client, + self._security_group_rules[0]['id'], + ), + mock.call(self.compute_sdk_client, 'unexist_rule'), + ] + ) class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): @@ -432,10 +442,10 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): self.app.client_manager.network_endpoint_enabled = False - self.compute_client.api.security_group_find = mock.Mock( + compute_v2.find_security_group = mock.Mock( return_value=self._security_group, ) - self.compute_client.api.security_group_list = mock.Mock( + compute_v2.list_security_groups = mock.Mock( return_value=[self._security_group], ) @@ -446,8 +456,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): parsed_args = self.check_parser(self.cmd, [], []) columns, data = self.cmd.take_action(parsed_args) - self.compute_client.api.security_group_list.assert_called_once_with( - search_opts={'all_tenants': False} + compute_v2.list_security_groups.assert_called_once_with( + self.compute_sdk_client, all_projects=False ) self.assertEqual(self.expected_columns_no_group, columns) self.assertEqual(self.expected_data_no_group, list(data)) @@ -462,8 +472,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.compute_client.api.security_group_find.assert_called_once_with( - self._security_group['id'] + compute_v2.find_security_group.assert_called_once_with( + self.compute_sdk_client, self._security_group['id'] ) self.assertEqual(self.expected_columns_with_group, columns) self.assertEqual(self.expected_data_with_group, list(data)) @@ -478,8 +488,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.compute_client.api.security_group_list.assert_called_once_with( - search_opts={'all_tenants': True} + compute_v2.list_security_groups.assert_called_once_with( + self.compute_sdk_client, all_projects=True ) self.assertEqual(self.expected_columns_no_group, columns) self.assertEqual(self.expected_data_no_group, list(data)) @@ -494,8 +504,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.compute_client.api.security_group_list.assert_called_once_with( - search_opts={'all_tenants': False} + compute_v2.list_security_groups.assert_called_once_with( + self.compute_sdk_client, all_projects=False ) self.assertEqual(self.expected_columns_no_group, columns) self.assertEqual(self.expected_data_no_group, list(data)) @@ -517,7 +527,7 @@ class TestShowSecurityGroupRuleCompute(compute_fakes.TestComputev2): # Build a security group fake customized for this test. security_group_rules = [self._security_group_rule] security_group = {'rules': security_group_rules} - self.compute_client.api.security_group_list = mock.Mock( + compute_v2.list_security_groups = mock.Mock( return_value=[security_group], ) @@ -540,6 +550,8 @@ class TestShowSecurityGroupRuleCompute(compute_fakes.TestComputev2): columns, data = self.cmd.take_action(parsed_args) - self.compute_client.api.security_group_list.assert_called_once_with() + compute_v2.list_security_groups.assert_called_once_with( + self.compute_sdk_client + ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data)