network: Replace use of in-tree API client

None of these are actually supported by openstacksdk (intentionally so)
so we add our own manual implementations.

Change-Id: Ifd24f04ae4d1e56e0ce5ba0afe63828403bb7a6f
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2024-05-16 12:01:44 +01:00
parent 30a64579b6
commit 209f8e9e17
18 changed files with 917 additions and 1502 deletions

View File

@ -9,7 +9,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Compute v2 API Library
@ -19,571 +18,49 @@ intentionally supported by SDK. Most of these are proxy APIs.
import http
from keystoneauth1 import exceptions as ksa_exceptions
from openstack import exceptions as sdk_exceptions
from osc_lib.api import api
from osc_lib import exceptions
from osc_lib.i18n import _
# TODO(dtroyer): Migrate this to osc-lib
class InvalidValue(Exception):
"""An argument value is not valid: wrong type, out of range, etc"""
# security groups
message = "Supplied value is not valid"
def create_security_group(compute_client, name=None, description=None):
"""Create a new security group
class APIv2(api.BaseAPI):
"""Compute v2 API"""
https://docs.openstack.org/api-ref/compute/#create-security-group
def __init__(self, **kwargs):
super().__init__(**kwargs)
:param compute_client: A compute client
:param str name: Security group name
:param str description: Security group description
:returns: A security group object
"""
data = {
'name': name,
'description': description,
}
response = compute_client.post(
'/os-security-groups', data=data, microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
return response.json()['security_group']
# Overrides
def _check_integer(self, value, msg=None):
"""Attempt to convert value to an integer
def list_security_groups(compute_client, all_projects=None):
"""Get all security groups
Raises InvalidValue on failure
https://docs.openstack.org/api-ref/compute/#list-security-groups
:param value:
Convert this to an integer. None is converted to 0 (zero).
:param msg:
An alternate message for the exception, must include exactly
one substitution to receive the attempted value.
"""
if value is None:
return 0
try:
value = int(value)
except (TypeError, ValueError):
if not msg:
msg = _("%s is not an integer") % value
raise InvalidValue(msg)
return value
# TODO(dtroyer): Override find() until these fixes get into an osc-lib
# minimum release
def find(
self,
path,
value=None,
attr=None,
):
"""Find a single resource by name or ID
:param string path:
The API-specific portion of the URL path
:param string value:
search expression (required, really)
:param string attr:
name of attribute for secondary search
"""
try:
ret = self._request('GET', f"/{path}/{value}").json()
if isinstance(ret, dict):
# strip off the enclosing dict
key = list(ret.keys())[0]
ret = ret[key]
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
kwargs = {attr: value}
try:
ret = self.find_one(path, **kwargs)
except ksa_exceptions.NotFound:
msg = _("%s not found") % value
raise exceptions.NotFound(msg)
return ret
# Floating IPs
def floating_ip_add(
self,
server,
address,
fixed_address=None,
):
"""Add a floating IP to a server
:param server:
The :class:`Server` (or its ID) to add an IP to.
:param address:
The FloatingIP or string floating address to add.
:param fixed_address:
The FixedIP the floatingIP should be associated with (optional)
"""
url = '/servers'
server = self.find(
url,
attr='name',
value=server,
)
address = address.ip if hasattr(address, 'ip') else address
if fixed_address:
if hasattr(fixed_address, 'ip'):
fixed_address = fixed_address.ip
body = {
'address': address,
'fixed_address': fixed_address,
}
else:
body = {
'address': address,
}
return self._request(
"POST",
"/{}/{}/action".format(url, server['id']),
json={'addFloatingIp': body},
)
def floating_ip_create(
self,
pool=None,
):
"""Create a new floating ip
https://docs.openstack.org/api-ref/compute/#create-allocate-floating-ip-address
:param pool: Name of floating IP pool
"""
url = "/os-floating-ips"
try:
return self.create(
url,
json={'pool': pool},
)['floating_ip']
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
msg = _("%s not found") % pool
raise exceptions.NotFound(msg)
def floating_ip_delete(
self,
floating_ip_id=None,
):
"""Delete a floating IP
https://docs.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address
:param string floating_ip_id:
Floating IP ID
"""
url = "/os-floating-ips"
if floating_ip_id is not None:
return self.delete(f'/{url}/{floating_ip_id}')
return None
def floating_ip_find(
self,
floating_ip=None,
):
"""Return a security group given name or ID
https://docs.openstack.org/api-ref/compute/#list-floating-ip-addresses
:param string floating_ip:
Floating IP address
:returns: A dict of the floating IP attributes
"""
url = "/os-floating-ips"
return self.find(
url,
attr='ip',
value=floating_ip,
)
def floating_ip_list(
self,
):
"""Get floating IPs
https://docs.openstack.org/api-ref/compute/#show-floating-ip-address-details
:returns:
list of floating IPs
"""
url = "/os-floating-ips"
return self.list(url)["floating_ips"]
def floating_ip_remove(
self,
server,
address,
):
"""Remove a floating IP from a server
:param server:
The :class:`Server` (or its ID) to add an IP to.
:param address:
The FloatingIP or string floating address to add.
"""
url = '/servers'
server = self.find(
url,
attr='name',
value=server,
)
address = address.ip if hasattr(address, 'ip') else address
body = {
'address': address,
}
return self._request(
"POST",
"/{}/{}/action".format(url, server['id']),
json={'removeFloatingIp': body},
)
# Floating IP Pools
def floating_ip_pool_list(
self,
):
"""Get floating IP pools
https://docs.openstack.org/api-ref/compute/?expanded=#list-floating-ip-pools
:returns:
list of floating IP pools
"""
url = "/os-floating-ip-pools"
return self.list(url)["floating_ip_pools"]
# Networks
def network_create(
self,
name=None,
subnet=None,
share_subnet=None,
):
"""Create a new network
https://docs.openstack.org/api-ref/compute/#create-network
:param string name:
Network label (required)
:param integer subnet:
Subnet for IPv4 fixed addresses in CIDR notation (required)
:param integer share_subnet:
Shared subnet between projects, True or False
:returns: A dict of the network attributes
"""
url = "/os-networks"
params = {
'label': name,
'cidr': subnet,
}
if share_subnet is not None:
params['share_address'] = share_subnet
return self.create(
url,
json={'network': params},
)['network']
def network_delete(
self,
network=None,
):
"""Delete a network
https://docs.openstack.org/api-ref/compute/#delete-network
:param string network:
Network name or ID
"""
url = "/os-networks"
network = self.find(
url,
attr='label',
value=network,
)['id']
if network is not None:
return self.delete(f'/{url}/{network}')
return None
def network_find(
self,
network=None,
):
"""Return a network given name or ID
https://docs.openstack.org/api-ref/compute/#show-network-details
:param string network:
Network name or ID
:returns: A dict of the network attributes
"""
url = "/os-networks"
return self.find(
url,
attr='label',
value=network,
)
def network_list(
self,
):
"""Get networks
https://docs.openstack.org/api-ref/compute/#list-networks
:returns:
list of networks
"""
url = "/os-networks"
return self.list(url)["networks"]
# Security Groups
def security_group_create(
self,
name=None,
description=None,
):
"""Create a new security group
https://docs.openstack.org/api-ref/compute/#create-security-group
:param string name:
Security group name
:param integer description:
Security group description
"""
url = "/os-security-groups"
params = {
'name': name,
'description': description,
}
return self.create(
url,
json={'security_group': params},
)['security_group']
def security_group_delete(
self,
security_group=None,
):
"""Delete a security group
https://docs.openstack.org/api-ref/compute/#delete-security-group
:param string security_group:
Security group name or ID
"""
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)['id']
if security_group is not None:
return self.delete(f'/{url}/{security_group}')
return None
def security_group_find(
self,
security_group=None,
):
"""Return a security group given name or ID
https://docs.openstack.org/api-ref/compute/#show-security-group-details
:param string security_group:
Security group name or ID
:returns: A dict of the security group attributes
"""
url = "/os-security-groups"
return self.find(
url,
attr='name',
value=security_group,
)
def security_group_list(
self,
limit=None,
marker=None,
search_opts=None,
):
"""Get security groups
https://docs.openstack.org/api-ref/compute/#list-security-groups
:param integer limit:
query return count limit
:param string marker:
query marker
:param search_opts:
(undocumented) Search filter dict
all_tenants: True|False - return all projects
:returns:
list of security groups names
"""
params = {}
if search_opts is not None:
params = {k: v for (k, v) in search_opts.items() if v}
if limit:
params['limit'] = limit
if marker:
params['offset'] = marker
url = "/os-security-groups"
return self.list(url, **params)["security_groups"]
def security_group_set(
self,
security_group=None,
# name=None,
# description=None,
**params,
):
"""Update a security group
https://docs.openstack.org/api-ref/compute/#update-security-group
:param string security_group:
Security group name or ID
TODO(dtroyer): Create an update method in osc-lib
"""
# Short-circuit no-op
if params is None:
return None
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)
if security_group is not None:
for k, v in params.items():
# Only set a value if it is already present
if k in security_group:
security_group[k] = v
return self._request(
"PUT",
"/{}/{}".format(url, security_group['id']),
json={'security_group': security_group},
).json()['security_group']
return None
# Security Group Rules
def security_group_rule_create(
self,
security_group_id=None,
ip_protocol=None,
from_port=None,
to_port=None,
remote_ip=None,
remote_group=None,
):
"""Create a new security group rule
https://docs.openstack.org/api-ref/compute/#create-security-group-rule
:param string security_group_id:
Security group ID
:param ip_protocol:
IP protocol, 'tcp', 'udp' or 'icmp'
:param from_port:
Source port
:param to_port:
Destination port
:param remote_ip:
Source IP address in CIDR notation
:param remote_group:
Remote security group
"""
url = "/os-security-group-rules"
if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']:
raise InvalidValue(
"%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol
)
params = {
'parent_group_id': security_group_id,
'ip_protocol': ip_protocol,
'from_port': self._check_integer(from_port),
'to_port': self._check_integer(to_port),
'cidr': remote_ip,
'group_id': remote_group,
}
return self.create(
url,
json={'security_group_rule': params},
)['security_group_rule']
def security_group_rule_delete(
self,
security_group_rule_id=None,
):
"""Delete a security group rule
https://docs.openstack.org/api-ref/compute/#delete-security-group-rule
:param string security_group_rule_id:
Security group rule ID
"""
url = "/os-security-group-rules"
if security_group_rule_id is not None:
return self.delete(f'/{url}/{security_group_rule_id}')
return None
:param compute_client: A compute client
:param bool all_projects: If true, list from all projects
:returns: A list of security group objects
"""
url = '/os-security-groups'
if all_projects is not None:
url += f'?all_tenants={all_projects}'
response = compute_client.get(url, microversion='2.1')
sdk_exceptions.raise_from_response(response)
return response.json()['security_groups']
def find_security_group(compute_client, name_or_id):
@ -623,6 +100,145 @@ def find_security_group(compute_client, name_or_id):
return found
def update_security_group(
compute_client, security_group_id, name=None, description=None
):
"""Update an existing security group
https://docs.openstack.org/api-ref/compute/#update-security-group
:param compute_client: A compute client
:param str security_group_id: The ID of the security group to update
:param str name: Security group name
:param str description: Security group description
:returns: A security group object
"""
data = {}
if name:
data['name'] = name
if description:
data['description'] = description
response = compute_client.put(
f'/os-security-groups/{security_group_id}',
data=data,
microversion='2.1',
)
sdk_exceptions.raise_from_response(response)
return response.json()['security_group']
def delete_security_group(compute_client, security_group_id=None):
"""Delete a security group
https://docs.openstack.org/api-ref/compute/#delete-security-group
:param compute_client: A compute client
:param str security_group_id: Security group ID
:returns: None
"""
response = compute_client.delete(
f'/os-security-groups/{security_group_id}', microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
# security group rules
def create_security_group_rule(
compute_client,
security_group_id=None,
ip_protocol=None,
from_port=None,
to_port=None,
remote_ip=None,
remote_group=None,
):
"""Create a new security group rule
https://docs.openstack.org/api-ref/compute/#create-security-group-rule
:param compute_client: A compute client
:param str security_group_id: Security group ID
:param str ip_protocol: IP protocol, 'tcp', 'udp' or 'icmp'
:param int from_port: Source port
:param int to_port: Destination port
:param str remote_ip: Source IP address in CIDR notation
:param str remote_group: Remote security group
:returns: A security group object
"""
data = {
'parent_group_id': security_group_id,
'ip_protocol': ip_protocol,
'from_port': from_port,
'to_port': to_port,
'cidr': remote_ip,
'group_id': remote_group,
}
response = compute_client.post(
'/os-security-group-rules', data=data, microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
return response.json()['security_group_rule']
def delete_security_group_rule(compute_client, security_group_rule_id=None):
"""Delete a security group rule
https://docs.openstack.org/api-ref/compute/#delete-security-group-rule
:param compute_client: A compute client
:param str security_group_rule_id: Security group rule ID
:returns: None
"""
response = compute_client.delete(
f'/os-security-group-rules/{security_group_rule_id}',
microversion='2.1',
)
sdk_exceptions.raise_from_response(response)
# networks
def create_network(compute_client, name, subnet, share_subnet=None):
"""Create a new network
https://docs.openstack.org/api-ref/compute/#create-network
:param compute_client: A compute client
:param str name: Network label
:param int subnet: Subnet for IPv4 fixed addresses in CIDR notation
:param bool share_subnet: Shared subnet between projects
:returns: A network object
"""
data = {
'label': name,
'cidr': subnet,
}
if share_subnet is not None:
data['share_address'] = share_subnet
response = compute_client.post(
'/os-networks', data=data, microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
return response.json()['network']
def list_networks(compute_client):
"""Get all networks
https://docs.openstack.org/api-ref/compute/#list-networks
:param compute_client: A compute client
:returns: A list of network objects
"""
response = compute_client.get('/os-networks', microversion='2.1')
sdk_exceptions.raise_from_response(response)
return response.json()['networks']
def find_network(compute_client, name_or_id):
"""Find the ID for a given network name or ID
@ -658,3 +274,94 @@ def find_network(compute_client, name_or_id):
raise exceptions.NotFound(f'{name_or_id} not found')
return found
def delete_network(compute_client, network_id):
"""Delete a network
https://docs.openstack.org/api-ref/compute/#delete-network
:param compute_client: A compute client
:param string network_id: The network ID
:returns: None
"""
response = compute_client.delete(
f'/os-networks/{network_id}', microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
# floating ips
def create_floating_ip(compute_client, network):
"""Create a new floating ip
https://docs.openstack.org/api-ref/compute/#create-allocate-floating-ip-address
:param network: Name of floating IP pool
:returns: A floating IP object
"""
response = compute_client.post(
'/os-floating-ips', data={'pool': network}, microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
return response.json()['floating_ip']
def list_floating_ips(compute_client):
"""Get all floating IPs
https://docs.openstack.org/api-ref/compute/#list-floating-ip-addresses
:returns: A list of floating IP objects
"""
response = compute_client.get('/os-floating-ips', microversion='2.1')
sdk_exceptions.raise_from_response(response)
return response.json()['floating_ips']
def get_floating_ip(compute_client, floating_ip_id):
"""Get a floating IP
https://docs.openstack.org/api-ref/compute/#show-floating-ip-address-details
:param string floating_ip_id: The floating IP address
:returns: A floating IP object
"""
response = compute_client.get(
f'/os-floating-ips/{floating_ip_id}', microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
return response.json()['floating_ip']
def delete_floating_ip(compute_client, floating_ip_id):
"""Delete a floating IP
https://docs.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address
:param string floating_ip_id: The floating IP address
:returns: None
"""
response = compute_client.delete(
f'/os-floating-ips/{floating_ip_id}', microversion='2.1'
)
sdk_exceptions.raise_from_response(response)
# floating ip pools
def list_floating_ip_pools(compute_client):
"""Get all floating IP pools
https://docs.openstack.org/api-ref/compute/#list-floating-ip-pools
:param compute_client: A compute client
:returns: A list of floating IP pool objects
"""
response = compute_client.get('/os-floating-ip-pools', microversion='2.1')
sdk_exceptions.raise_from_response(response)
return response.json()['floating_ip_pools']

View File

@ -31,7 +31,6 @@ API_VERSIONS = {
"2.1": "novaclient.client",
}
COMPUTE_API_TYPE = 'compute'
COMPUTE_API_VERSIONS = {
'2': 'openstackclient.api.compute_v2.APIv2',
}
@ -63,15 +62,6 @@ def make_client(instance):
# fallback to use the max version of novaclient side.
version = novaclient.API_MAX_VERSION
LOG.debug('Instantiating compute client for %s', version)
compute_api = utils.get_client_class(
API_NAME,
version.ver_major,
COMPUTE_API_VERSIONS,
)
LOG.debug('Instantiating compute api: %s', compute_api)
# Set client http_log_debug to True if verbosity level is high enough
http_log_debug = utils.get_effective_log_level() <= logging.DEBUG
@ -94,16 +84,6 @@ def make_client(instance):
**kwargs
)
client.api = compute_api(
session=instance.session,
service_type=COMPUTE_API_TYPE,
endpoint=instance.get_endpoint_for_service_type(
COMPUTE_API_TYPE,
region_name=instance.region_name,
interface=instance.interface,
),
)
return client

View File

@ -552,8 +552,9 @@ class AddFloatingIP(network_common.NetworkAndComputeCommand):
raise error
def take_action_compute(self, client, parsed_args):
client.api.floating_ip_add(
parsed_args.server,
server = client.find_server(parsed_args.server, ignore_missing=False)
client.add_floating_ip_to_server(
server,
parsed_args.ip_address,
fixed_address=parsed_args.fixed_ip_address,
)
@ -3921,10 +3922,8 @@ class RemoveFloatingIP(network_common.NetworkAndComputeCommand):
client.update_ip(obj, **attrs)
def take_action_compute(self, client, parsed_args):
client.api.floating_ip_remove(
parsed_args.server,
parsed_args.ip_address,
)
server = client.find_server(parsed_args.server, ignore_missing=False)
client.remove_floating_ip_from_server(server, parsed_args.ip_address)
class RemovePort(command.Command):

View File

@ -161,7 +161,7 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
)
elif self.is_nova_network:
return self.take_action_compute(
self.app.client_manager.compute, parsed_args
self.app.client_manager.sdk_connection.compute, parsed_args
)
def take_action_network(self, client, parsed_args):
@ -210,7 +210,8 @@ class NetworkAndComputeDelete(NetworkAndComputeCommand, metaclass=abc.ABCMeta):
)
else:
self.take_action_compute(
self.app.client_manager.compute, parsed_args
self.app.client_manager.sdk_connection.compute,
parsed_args,
)
except Exception as e:
msg = _(
@ -267,7 +268,7 @@ class NetworkAndComputeShowOne(
)
else:
return self.take_action_compute(
self.app.client_manager.compute, parsed_args
self.app.client_manager.sdk_connection.compute, parsed_args
)
except openstack.exceptions.HttpException as exc:
msg = _("Error while executing command: %s") % exc.message

View File

@ -9,18 +9,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""IP Floating action implementations"""
from osc_lib import utils
from osc_lib.utils import tags as _tag
from openstackclient.api import compute_v2
from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
from openstackclient.network import common
_formatters = {
'port_details': utils.format_dict,
}
@ -200,7 +199,7 @@ class CreateFloatingIP(
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
obj = client.api.floating_ip_create(parsed_args.network)
obj = compute_v2.create_floating_ip(client, parsed_args.network)
columns = _get_columns(obj)
data = utils.get_dict_properties(obj, columns)
return (columns, data)
@ -230,7 +229,7 @@ class DeleteFloatingIP(common.NetworkAndComputeDelete):
client.delete_ip(obj)
def take_action_compute(self, client, parsed_args):
client.api.floating_ip_delete(self.r)
compute_v2.delete_floating_ip(client, self.r)
class ListFloatingIP(common.NetworkAndComputeLister):
@ -421,8 +420,7 @@ class ListFloatingIP(common.NetworkAndComputeLister):
'Pool',
)
data = client.api.floating_ip_list()
objs = compute_v2.list_floating_ips(client)
return (
headers,
(
@ -431,7 +429,7 @@ class ListFloatingIP(common.NetworkAndComputeLister):
columns,
formatters={},
)
for s in data
for s in objs
),
)
@ -538,7 +536,7 @@ class ShowFloatingIP(common.NetworkAndComputeShowOne):
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
obj = client.api.floating_ip_find(parsed_args.floating_ip)
obj = compute_v2.get_floating_ip(client, parsed_args.floating_ip)
columns = _get_columns(obj)
data = utils.get_dict_properties(obj, columns)
return (columns, data)

View File

@ -9,14 +9,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Floating IP Pool action implementations"""
from osc_lib import exceptions
from osc_lib import utils
from openstackclient.api import compute_v2
from openstackclient.i18n import _
from openstackclient.network import common
@ -33,15 +31,8 @@ class ListFloatingIPPool(common.NetworkAndComputeLister):
def take_action_compute(self, client, parsed_args):
columns = ('Name',)
data = client.api.floating_ip_pool_list()
data = [
(x['name'],) for x in compute_v2.list_floating_ip_pools(client)
]
return (
columns,
(
utils.get_dict_properties(
s,
columns,
)
for s in data
),
)
return (columns, data)

View File

@ -9,7 +9,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Network action implementations"""
@ -18,6 +17,7 @@ from osc_lib.cli import format_columns
from osc_lib import utils
from osc_lib.utils import tags as _tag
from openstackclient.api import compute_v2
from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
from openstackclient.network import common
@ -388,7 +388,7 @@ class CreateNetwork(
def take_action_compute(self, client, parsed_args):
attrs = _get_attrs_compute(self.app.client_manager, parsed_args)
obj = client.api.network_create(**attrs)
obj = compute_v2.create_network(client, **attrs)
display_columns, columns = _get_columns_compute(obj)
data = utils.get_dict_properties(obj, columns)
return (display_columns, data)
@ -416,7 +416,8 @@ class DeleteNetwork(common.NetworkAndComputeDelete):
client.delete_network(obj)
def take_action_compute(self, client, parsed_args):
client.api.network_delete(self.r)
network = compute_v2.find_network(client, self.r)
compute_v2.delete_network(client, network['id'])
# TODO(sindhu): Use the SDK resource mapped attribute names once the
@ -673,7 +674,7 @@ class ListNetwork(common.NetworkAndComputeLister):
'Subnet',
)
data = client.api.network_list()
data = compute_v2.list_networks(client)
return (
column_headers,
@ -828,7 +829,7 @@ class ShowNetwork(common.NetworkAndComputeShowOne):
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
obj = client.api.network_find(parsed_args.network)
obj = compute_v2.find_network(client, parsed_args.network)
display_columns, columns = _get_columns_compute(obj)
data = utils.get_dict_properties(obj, columns)
return (display_columns, data)

View File

@ -20,6 +20,7 @@ from osc_lib.command import command
from osc_lib import utils
from osc_lib.utils import tags as _tag
from openstackclient.api import compute_v2
from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
from openstackclient.network import common
@ -180,7 +181,8 @@ class CreateSecurityGroup(
def take_action_compute(self, client, parsed_args):
description = self._get_description(parsed_args)
obj = client.api.security_group_create(
obj = compute_v2.create_security_group(
client,
parsed_args.name,
description,
)
@ -212,7 +214,8 @@ class DeleteSecurityGroup(common.NetworkAndComputeDelete):
client.delete_security_group(obj)
def take_action_compute(self, client, parsed_args):
client.api.security_group_delete(self.r)
security_group = compute_v2.find_security_group(client, self.r)
compute_v2.delete_security_group(client, security_group['id'])
# TODO(rauta): Use the SDK resource mapped attribute names once
@ -291,10 +294,10 @@ class ListSecurityGroup(common.NetworkAndComputeLister):
)
def take_action_compute(self, client, parsed_args):
search = {'all_tenants': parsed_args.all_projects}
data = client.api.security_group_list(
data = compute_v2.list_security_groups(
# TODO(dtroyer): add limit, marker
search_opts=search,
client,
all_projects=parsed_args.all_projects,
)
columns = (
@ -383,20 +386,21 @@ class SetSecurityGroup(
_tag.update_tags_for_set(client, obj, parsed_args)
def take_action_compute(self, client, parsed_args):
data = client.api.security_group_find(parsed_args.group)
security_group = compute_v2.find_security_group(
client, parsed_args.group
)
params = {}
if parsed_args.name is not None:
data['name'] = parsed_args.name
params['name'] = parsed_args.name
if parsed_args.description is not None:
data['description'] = parsed_args.description
params['description'] = parsed_args.description
# NOTE(rtheis): Previous behavior did not raise a CommandError
# if there were no updates. Maintain this behavior and issue
# the update.
client.api.security_group_set(
data,
data['name'],
data['description'],
compute_v2.update_security_group(
client, security_group['id'], **params
)
@ -422,7 +426,7 @@ class ShowSecurityGroup(common.NetworkAndComputeShowOne):
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
obj = client.api.security_group_find(parsed_args.group)
obj = compute_v2.find_security_group(client, parsed_args.group)
display_columns, property_columns = _get_columns(obj)
data = utils.get_dict_properties(
obj, property_columns, formatters=_formatters_compute

View File

@ -20,6 +20,7 @@ from osc_lib.cli import parseractions
from osc_lib import exceptions
from osc_lib import utils
from openstackclient.api import compute_v2
from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
from openstackclient.network import common
@ -91,7 +92,7 @@ class CreateSecurityGroupRule(
"ending port range: 137:139. Required for IP protocols TCP "
"and UDP. Ignored for ICMP IP protocols."
),
**dst_port_default
**dst_port_default,
)
# NOTE(rtheis): Support either protocol option name for now.
@ -125,7 +126,7 @@ class CreateSecurityGroupRule(
metavar='<protocol>',
type=network_utils.convert_to_lowercase,
help=protocol_help,
**proto_choices
**proto_choices,
)
if not self.is_docs_build:
protocol_group.add_argument(
@ -133,7 +134,7 @@ class CreateSecurityGroupRule(
metavar='<proto>',
type=network_utils.convert_to_lowercase,
help=argparse.SUPPRESS,
**proto_choices
**proto_choices,
)
return parser
@ -292,7 +293,7 @@ class CreateSecurityGroupRule(
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
group = client.api.security_group_find(parsed_args.group)
group = compute_v2.find_security_group(client, parsed_args.group)
protocol = network_utils.get_protocol(
parsed_args, default_protocol='tcp'
)
@ -303,15 +304,16 @@ class CreateSecurityGroupRule(
remote_ip = None
if parsed_args.remote_group is not None:
parsed_args.remote_group = client.api.security_group_find(
parsed_args.remote_group,
parsed_args.remote_group = compute_v2.find_security_group(
client, parsed_args.remote_group
)['id']
if parsed_args.remote_ip is not None:
remote_ip = parsed_args.remote_ip
else:
remote_ip = '0.0.0.0/0'
obj = client.api.security_group_rule_create(
obj = compute_v2.create_security_group_rule(
client,
security_group_id=group['id'],
ip_protocol=protocol,
from_port=from_port,
@ -343,7 +345,7 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeDelete):
client.delete_security_group_rule(obj)
def take_action_compute(self, client, parsed_args):
client.api.security_group_rule_delete(self.r)
compute_v2.delete_security_group_rule(client, self.r)
class ListSecurityGroupRule(common.NetworkAndComputeLister):
@ -532,15 +534,16 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
rules_to_list = []
if parsed_args.group is not None:
group = client.api.security_group_find(
parsed_args.group,
security_group = compute_v2.find_security_group(
client, parsed_args.group
)
rules_to_list = group['rules']
rules_to_list = security_group['rules']
else:
columns = columns + ('parent_group_id',)
search = {'all_tenants': parsed_args.all_projects}
for group in client.api.security_group_list(search_opts=search):
rules_to_list.extend(group['rules'])
for security_group in compute_v2.list_security_groups(
client, all_projects=parsed_args.all_projects
):
rules_to_list.extend(security_group['rules'])
# NOTE(rtheis): Turn the raw rules into resources.
rules = []
@ -596,7 +599,7 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
# the requested rule.
obj = None
security_group_rules = []
for security_group in client.api.security_group_list():
for security_group in compute_v2.list_security_groups(client):
security_group_rules.extend(security_group['rules'])
for security_group_rule in security_group_rules:
if parsed_args.rule == str(security_group_rule.get('id')):

File diff suppressed because it is too large Load Diff

View File

@ -39,7 +39,6 @@ from openstack.compute.v2 import service as _service
from openstack.compute.v2 import usage as _usage
from openstack.compute.v2 import volume_attachment as _volume_attachment
from openstackclient.api import compute_v2
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
from openstackclient.tests.unit.image.v2 import fakes as image_fakes
@ -154,11 +153,6 @@ class FakeClientMixin:
)
self.compute_client = self.app.client_manager.compute
self.compute_client.api = compute_v2.APIv2(
session=self.app.client_manager.session,
endpoint=fakes.AUTH_URL,
)
# TODO(stephenfin): Rename to 'compute_client' once all commands are
# migrated to SDK
self.app.client_manager.sdk_connection.compute = mock.Mock(

View File

@ -392,57 +392,57 @@ class TestServerAddFixedIP(TestServer):
)
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_add')
class TestServerAddFloatingIPCompute(compute_fakes.TestComputev2):
def setUp(self):
super().setUp()
self.app.client_manager.network_endpoint_enabled = False
self.server = compute_fakes.create_one_sdk_server()
self.compute_sdk_client.find_server.return_value = self.server
# Get the command object to test
self.cmd = server.AddFloatingIP(self.app, None)
def test_server_add_floating_ip_default(self, fip_mock):
_floating_ip = compute_fakes.create_one_floating_ip()
def test_server_add_floating_ip_default(self):
arglist = [
'server1',
_floating_ip['ip'],
self.server.name,
'1.2.3.4',
]
verifylist = [
('server', 'server1'),
('ip_address', _floating_ip['ip']),
('server', self.server.name),
('ip_address', '1.2.3.4'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(
'server1',
_floating_ip['ip'],
fixed_address=None,
self.compute_sdk_client.find_server.assert_called_once_with(
self.server.name, ignore_missing=False
)
self.compute_sdk_client.add_floating_ip_to_server.assert_called_once_with(
self.server, '1.2.3.4', fixed_address=None
)
def test_server_add_floating_ip_fixed(self, fip_mock):
_floating_ip = compute_fakes.create_one_floating_ip()
def test_server_add_floating_ip_fixed(self):
arglist = [
'--fixed-ip-address',
_floating_ip['fixed_ip'],
'server1',
_floating_ip['ip'],
'5.6.7.8',
self.server.name,
'1.2.3.4',
]
verifylist = [
('fixed_ip_address', _floating_ip['fixed_ip']),
('server', 'server1'),
('ip_address', _floating_ip['ip']),
('fixed_ip_address', '5.6.7.8'),
('server', self.server.name),
('ip_address', '1.2.3.4'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(
'server1',
_floating_ip['ip'],
fixed_address=_floating_ip['fixed_ip'],
self.compute_sdk_client.find_server.assert_called_once_with(
self.server.name, ignore_missing=False
)
self.compute_sdk_client.add_floating_ip_to_server.assert_called_once_with(
self.server, '1.2.3.4', fixed_address='5.6.7.8'
)
@ -7267,34 +7267,34 @@ class TestServerRescue(compute_fakes.TestComputev2):
self.assertIsNone(result)
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_remove')
class TestServerRemoveFloatingIPCompute(compute_fakes.TestComputev2):
def setUp(self):
super().setUp()
self.app.client_manager.network_endpoint_enabled = False
self.server = compute_fakes.create_one_sdk_server()
self.compute_sdk_client.find_server.return_value = self.server
# Get the command object to test
self.cmd = server.RemoveFloatingIP(self.app, None)
def test_server_remove_floating_ip(self, fip_mock):
_floating_ip = compute_fakes.create_one_floating_ip()
def test_server_remove_floating_ip(self):
arglist = [
'server1',
_floating_ip['ip'],
self.server.name,
'1.2.3.4',
]
verifylist = [
('server', 'server1'),
('ip_address', _floating_ip['ip']),
('server', self.server.name),
('ip_address', '1.2.3.4'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(
'server1',
_floating_ip['ip'],
self.compute_sdk_client.find_server.assert_called_once_with(
self.server.name, ignore_missing=False
)
self.compute_sdk_client.remove_floating_ip_from_server.assert_called_once_with(
self.server, '1.2.3.4'
)

View File

@ -132,8 +132,8 @@ class TestNetworkAndCompute(utils.TestCommand):
return_value='take_action_network'
)
self.app.client_manager.compute = mock.Mock()
self.compute_client = self.app.client_manager.compute
self.app.client_manager.sdk_connection.compute = mock.Mock()
self.compute_client = self.app.client_manager.sdk_connection.compute
self.compute_client.compute_action = mock.Mock(
return_value='take_action_compute'
)

View File

@ -12,21 +12,17 @@
#
from unittest import mock
from unittest.mock import call
from osc_lib import exceptions
from openstackclient.api import compute_v2
from openstackclient.network.v2 import floating_ip as fip
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit import utils as tests_utils
# Tests for Nova network
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_create')
@mock.patch.object(compute_v2, 'create_floating_ip')
class TestCreateFloatingIPCompute(compute_fakes.TestComputev2):
# The floating ip to be deleted.
_floating_ip = compute_fakes.create_one_floating_ip()
columns = (
@ -50,9 +46,6 @@ class TestCreateFloatingIPCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# self.compute_client.floating_ips.create.return_value = self.floating_ip
# Get the command object to test
self.cmd = fip.CreateFloatingIP(self.app, None)
def test_floating_ip_create_no_arg(self, fip_mock):
@ -79,14 +72,15 @@ class TestCreateFloatingIPCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(self._floating_ip['pool'])
fip_mock.assert_called_once_with(
self.compute_sdk_client, self._floating_ip['pool']
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_delete')
@mock.patch.object(compute_v2, 'delete_floating_ip')
class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2):
# The floating ips to be deleted.
_floating_ips = compute_fakes.create_floating_ips(count=2)
def setUp(self):
@ -94,7 +88,6 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = fip.DeleteFloatingIP(self.app, None)
def test_floating_ip_delete(self, fip_mock):
@ -109,27 +102,34 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(self._floating_ips[0]['id'])
fip_mock.assert_called_once_with(
self.compute_sdk_client, self._floating_ips[0]['id']
)
self.assertIsNone(result)
def test_floating_ip_delete_multi(self, fip_mock):
fip_mock.return_value = mock.Mock(return_value=None)
arglist = []
verifylist = []
for f in self._floating_ips:
arglist.append(f['id'])
arglist = [
self._floating_ips[0]['id'],
self._floating_ips[1]['id'],
]
verifylist = [
('floating_ip', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for f in self._floating_ips:
calls.append(call(f['id']))
fip_mock.assert_has_calls(calls)
fip_mock.assert_has_calls(
[
mock.call(
self.compute_sdk_client, self._floating_ips[0]['id']
),
mock.call(
self.compute_sdk_client, self._floating_ips[1]['id']
),
]
)
self.assertIsNone(result)
def test_floating_ip_delete_multi_exception(self, fip_mock):
@ -156,13 +156,16 @@ class TestDeleteFloatingIPCompute(compute_fakes.TestComputev2):
except exceptions.CommandError as e:
self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
fip_mock.assert_any_call(self._floating_ips[0]['id'])
fip_mock.assert_any_call('unexist_floating_ip')
fip_mock.assert_any_call(
self.compute_sdk_client, self._floating_ips[0]['id']
)
fip_mock.assert_any_call(
self.compute_sdk_client, 'unexist_floating_ip'
)
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_list')
@mock.patch.object(compute_v2, 'list_floating_ips')
class TestListFloatingIPCompute(compute_fakes.TestComputev2):
# The floating ips to be list up
_floating_ips = compute_fakes.create_floating_ips(count=3)
columns = (
@ -190,7 +193,6 @@ class TestListFloatingIPCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = fip.ListFloatingIP(self.app, None)
def test_floating_ip_list(self, fip_mock):
@ -201,14 +203,13 @@ class TestListFloatingIPCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with()
fip_mock.assert_called_once_with(self.compute_sdk_client)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_find')
@mock.patch.object(compute_v2, 'get_floating_ip')
class TestShowFloatingIPCompute(compute_fakes.TestComputev2):
# The floating ip to display.
_floating_ip = compute_fakes.create_one_floating_ip()
columns = (
@ -232,7 +233,6 @@ class TestShowFloatingIPCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = fip.ShowFloatingIP(self.app, None)
def test_floating_ip_show(self, fip_mock):
@ -247,6 +247,8 @@ class TestShowFloatingIPCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
fip_mock.assert_called_once_with(self._floating_ip['id'])
fip_mock.assert_called_once_with(
self.compute_sdk_client, self._floating_ip['id']
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)

View File

@ -13,14 +13,12 @@
from unittest import mock
from openstackclient.api import compute_v2
from openstackclient.network.v2 import floating_ip_pool
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
# Tests for Compute network
@mock.patch('openstackclient.api.compute_v2.APIv2.floating_ip_pool_list')
@mock.patch.object(compute_v2, 'list_floating_ip_pools')
class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2):
# The floating ip pools to list up
_floating_ip_pools = compute_fakes.create_floating_ip_pools(count=3)
@ -36,7 +34,6 @@ class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None)
def test_floating_ip_list(self, fipp_mock):
@ -47,6 +44,6 @@ class TestListFloatingIPPoolCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
fipp_mock.assert_called_once_with()
fipp_mock.assert_called_once_with(self.compute_sdk_client)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))

View File

@ -12,22 +12,17 @@
#
from unittest import mock
from unittest.mock import call
from osc_lib import exceptions
from openstackclient.api import compute_v2
from openstackclient.network.v2 import network
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit import utils as tests_utils
# Tests for Nova network
#
@mock.patch('openstackclient.api.compute_v2.APIv2.network_create')
@mock.patch.object(compute_v2, 'create_network')
class TestCreateNetworkCompute(compute_fakes.TestComputev2):
# The network to create.
_network = compute_fakes.create_one_network()
columns = (
@ -105,7 +100,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = network.CreateNetwork(self.app, None)
def test_network_create_no_options(self, net_mock):
@ -113,7 +107,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2):
arglist = []
verifylist = []
# Missing required args should raise exception here
self.assertRaises(
tests_utils.ParserException,
self.check_parser,
@ -131,7 +124,6 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2):
('name', self._network['label']),
]
# Missing required args should raise exception here
self.assertRaises(
tests_utils.ParserException,
self.check_parser,
@ -156,35 +148,29 @@ class TestCreateNetworkCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
net_mock.assert_called_once_with(
**{
'subnet': self._network['cidr'],
'name': self._network['label'],
}
self.compute_sdk_client,
subnet=self._network['cidr'],
name=self._network['label'],
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch('openstackclient.api.compute_v2.APIv2.network_delete')
@mock.patch.object(compute_v2, 'delete_network')
@mock.patch.object(compute_v2, 'find_network')
class TestDeleteNetworkCompute(compute_fakes.TestComputev2):
def setUp(self):
super().setUp()
self.app.client_manager.network_endpoint_enabled = False
# The networks to delete
self._networks = compute_fakes.create_networks(count=3)
# Return value of utils.find_resource()
self.compute_client.api.network_find = compute_fakes.get_networks(
networks=self._networks
)
# Get the command object to test
self.cmd = network.DeleteNetwork(self.app, None)
def test_network_delete_one(self, net_mock):
net_mock.return_value = mock.Mock(return_value=None)
def test_network_delete_one(self, find_net_mock, delete_net_mock):
find_net_mock.side_effect = self._networks
delete_net_mock.return_value = mock.Mock(return_value=None)
arglist = [
self._networks[0]['label'],
]
@ -195,35 +181,44 @@ class TestDeleteNetworkCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
net_mock.assert_called_once_with(
self._networks[0]['label'],
delete_net_mock.assert_called_once_with(
self.compute_sdk_client,
self._networks[0]['id'],
)
self.assertIsNone(result)
def test_network_delete_multi(self, net_mock):
net_mock.return_value = mock.Mock(return_value=None)
arglist = []
for n in self._networks:
arglist.append(n['id'])
def test_network_delete_multi(self, find_net_mock, delete_net_mock):
find_net_mock.side_effect = self._networks
delete_net_mock.return_value = mock.Mock(return_value=None)
arglist = [
self._networks[0]['id'],
self._networks[1]['id'],
]
verifylist = [
('network', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for n in self._networks:
calls.append(call(n['id']))
net_mock.assert_has_calls(calls)
delete_net_mock.assert_has_calls(
[
mock.call(self.compute_sdk_client, self._networks[0]['id']),
mock.call(self.compute_sdk_client, self._networks[1]['id']),
]
)
self.assertIsNone(result)
def test_network_delete_multi_with_exception(self, net_mock):
net_mock.return_value = mock.Mock(return_value=None)
net_mock.side_effect = [
mock.Mock(return_value=None),
exceptions.CommandError,
def test_network_delete_multi_with_exception(
self, find_net_mock, delete_net_mock
):
find_net_mock.side_effect = [
self._networks[0],
exceptions.NotFound('foo'),
self._networks[1],
]
delete_net_mock.return_value = mock.Mock(return_value=None)
arglist = [
self._networks[0]['id'],
'xxxx-yyyy-zzzz',
@ -234,20 +229,30 @@ class TestDeleteNetworkCompute(compute_fakes.TestComputev2):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('2 of 3 networks failed to delete.', str(e))
exc = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args,
)
self.assertEqual('1 of 3 networks failed to delete.', str(exc))
net_mock.assert_any_call(self._networks[0]['id'])
net_mock.assert_any_call(self._networks[1]['id'])
net_mock.assert_any_call('xxxx-yyyy-zzzz')
find_net_mock.assert_has_calls(
[
mock.call(self.compute_sdk_client, self._networks[0]['id']),
mock.call(self.compute_sdk_client, 'xxxx-yyyy-zzzz'),
mock.call(self.compute_sdk_client, self._networks[1]['id']),
]
)
delete_net_mock.assert_has_calls(
[
mock.call(self.compute_sdk_client, self._networks[0]['id']),
mock.call(self.compute_sdk_client, self._networks[1]['id']),
]
)
@mock.patch('openstackclient.api.compute_v2.APIv2.network_list')
@mock.patch.object(compute_v2, 'list_networks')
class TestListNetworkCompute(compute_fakes.TestComputev2):
# The networks going to be listed up.
_networks = compute_fakes.create_networks(count=3)
columns = (
@ -271,7 +276,6 @@ class TestListNetworkCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = network.ListNetwork(self.app, None)
def test_network_list_no_options(self, net_mock):
@ -280,19 +284,15 @@ class TestListNetworkCompute(compute_fakes.TestComputev2):
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# In base command class Lister in cliff, abstract method take_action()
# returns a tuple containing the column names and an iterable
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
net_mock.assert_called_once_with()
net_mock.assert_called_once_with(self.compute_sdk_client)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
@mock.patch('openstackclient.api.compute_v2.APIv2.network_find')
@mock.patch.object(compute_v2, 'find_network')
class TestShowNetworkCompute(compute_fakes.TestComputev2):
# The network to show.
_network = compute_fakes.create_one_network()
columns = (
@ -370,7 +370,6 @@ class TestShowNetworkCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = network.ShowNetwork(self.app, None)
def test_show_no_options(self, net_mock):
@ -398,6 +397,8 @@ class TestShowNetworkCompute(compute_fakes.TestComputev2):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
net_mock.assert_called_once_with(self._network['label'])
net_mock.assert_called_once_with(
self.compute_sdk_client, self._network['label']
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)

View File

@ -12,17 +12,17 @@
#
from unittest import mock
from unittest.mock import call
from osc_lib import exceptions
from openstackclient.api import compute_v2
from openstackclient.network.v2 import security_group
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit import utils as tests_utils
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_create')
@mock.patch.object(compute_v2, 'create_security_group')
class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2):
project = identity_fakes.FakeProject.create_one_project()
domain = identity_fakes.FakeDomain.create_one_domain()
@ -72,6 +72,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(
self.compute_sdk_client,
self._security_group['name'],
self._security_group['name'],
)
@ -94,6 +95,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(
self.compute_sdk_client,
self._security_group['name'],
self._security_group['description'],
)
@ -101,7 +103,7 @@ class TestCreateSecurityGroupCompute(compute_fakes.TestComputev2):
self.assertCountEqual(self.data, data)
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_delete')
@mock.patch.object(compute_v2, 'delete_security_group')
class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2):
# The security groups to be deleted.
_security_groups = compute_fakes.create_security_groups()
@ -111,8 +113,8 @@ class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
self.compute_client.api.security_group_find = (
compute_fakes.get_security_groups(self._security_groups)
compute_v2.find_security_group = mock.Mock(
side_effect=self._security_groups
)
# Get the command object to test
@ -131,59 +133,68 @@ class TestDeleteSecurityGroupCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(
self.compute_sdk_client,
self._security_groups[0]['id'],
)
self.assertIsNone(result)
def test_security_group_multi_delete(self, sg_mock):
sg_mock.return_value = mock.Mock(return_value=None)
arglist = []
verifylist = []
for s in self._security_groups:
arglist.append(s['id'])
arglist = [
self._security_groups[0]['id'],
self._security_groups[1]['id'],
]
verifylist = [
('group', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_groups:
calls.append(call(s['id']))
sg_mock.assert_has_calls(calls)
sg_mock.assert_has_calls(
[
mock.call(
self.compute_sdk_client, self._security_groups[0]['id']
),
mock.call(
self.compute_sdk_client, self._security_groups[1]['id']
),
]
)
self.assertIsNone(result)
def test_security_group_multi_delete_with_exception(self, sg_mock):
sg_mock.return_value = mock.Mock(return_value=None)
sg_mock.side_effect = [
mock.Mock(return_value=None),
exceptions.CommandError,
compute_v2.find_security_group.side_effect = [
self._security_groups[0],
exceptions.NotFound('foo'),
]
arglist = [
self._security_groups[0]['id'],
'unexist_security_group',
]
verifylist = [
(
'group',
[self._security_groups[0]['id'], 'unexist_security_group'],
),
('group', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
exc = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args,
)
self.assertEqual('1 of 2 groups failed to delete.', str(exc))
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 groups failed to delete.', str(e))
sg_mock.assert_any_call(self._security_groups[0]['id'])
sg_mock.assert_any_call('unexist_security_group')
sg_mock.assert_has_calls(
[
mock.call(
self.compute_sdk_client, self._security_groups[0]['id']
),
]
)
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_list')
@mock.patch.object(compute_v2, 'list_security_groups')
class TestListSecurityGroupCompute(compute_fakes.TestComputev2):
# The security group to be listed.
_security_groups = compute_fakes.create_security_groups(count=3)
@ -238,8 +249,9 @@ class TestListSecurityGroupCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': False}}
sg_mock.assert_called_once_with(**kwargs)
sg_mock.assert_called_once_with(
self.compute_sdk_client, all_projects=False
)
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.data, list(data))
@ -255,13 +267,14 @@ class TestListSecurityGroupCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': True}}
sg_mock.assert_called_once_with(**kwargs)
sg_mock.assert_called_once_with(
self.compute_sdk_client, all_projects=True
)
self.assertEqual(self.columns_all_projects, columns)
self.assertCountEqual(self.data_all_projects, list(data))
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_set')
@mock.patch.object(compute_v2, 'update_security_group')
class TestSetSecurityGroupCompute(compute_fakes.TestComputev2):
# The security group to be set.
_security_group = compute_fakes.create_one_security_group()
@ -271,7 +284,7 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
self.compute_client.api.security_group_find = mock.Mock(
compute_v2.find_security_group = mock.Mock(
return_value=self._security_group
)
@ -296,9 +309,7 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(
self._security_group,
self._security_group['name'],
self._security_group['description'],
self.compute_sdk_client, self._security_group['id']
)
self.assertIsNone(result)
@ -323,12 +334,15 @@ class TestSetSecurityGroupCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(
self._security_group, new_name, new_description
self.compute_sdk_client,
self._security_group['id'],
name=new_name,
description=new_description,
)
self.assertIsNone(result)
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_find')
@mock.patch.object(compute_v2, 'find_security_group')
class TestShowSecurityGroupCompute(compute_fakes.TestComputev2):
# The security group rule to be shown with the group.
_security_group_rule = compute_fakes.create_one_security_group_rule()
@ -379,6 +393,8 @@ class TestShowSecurityGroupCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
sg_mock.assert_called_once_with(self._security_group['id'])
sg_mock.assert_called_once_with(
self.compute_sdk_client, self._security_group['id']
)
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.data, data)

View File

@ -9,13 +9,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from unittest.mock import call
from osc_lib import exceptions
from openstackclient.api import compute_v2
from openstackclient.network import utils as network_utils
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
@ -23,7 +22,7 @@ from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit import utils as tests_utils
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_rule_create')
@mock.patch.object(compute_v2, 'create_security_group_rule')
class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
project = identity_fakes.FakeProject.create_one_project()
domain = identity_fakes.FakeDomain.create_one_domain()
@ -51,7 +50,7 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
self.compute_client.api.security_group_find = mock.Mock(
compute_v2.find_security_group = mock.Mock(
return_value=self._security_group,
)
@ -159,9 +158,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
# TODO(dtroyer): save this for the security group rule changes
# self.compute_client.api.security_group_rule_create.assert_called_once_with(
sgr_mock.assert_called_once_with(
self.compute_sdk_client,
security_group_id=self._security_group['id'],
ip_protocol=self._security_group_rule['ip_protocol'],
from_port=self._security_group_rule['from_port'],
@ -203,9 +201,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
# TODO(dtroyer): save this for the security group rule changes
# self.compute_client.api.security_group_rule_create.assert_called_once_with(
sgr_mock.assert_called_once_with(
self.compute_sdk_client,
security_group_id=self._security_group['id'],
ip_protocol=self._security_group_rule['ip_protocol'],
from_port=self._security_group_rule['from_port'],
@ -242,9 +239,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
# TODO(dtroyer): save this for the security group rule changes
# self.compute_client.api.security_group_rule_create.assert_called_once_with(
sgr_mock.assert_called_once_with(
self.compute_sdk_client,
security_group_id=self._security_group['id'],
ip_protocol=self._security_group_rule['ip_protocol'],
from_port=self._security_group_rule['from_port'],
@ -282,9 +278,8 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
# TODO(dtroyer): save this for the security group rule changes
# self.compute_client.api.security_group_rule_create.assert_called_once_with(
sgr_mock.assert_called_once_with(
self.compute_sdk_client,
security_group_id=self._security_group['id'],
ip_protocol=self._security_group_rule['ip_protocol'],
from_port=self._security_group_rule['from_port'],
@ -296,7 +291,7 @@ class TestCreateSecurityGroupRuleCompute(compute_fakes.TestComputev2):
self.assertEqual(expected_data, data)
@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_rule_delete')
@mock.patch.object(compute_v2, 'delete_security_group_rule')
class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2):
# The security group rule to be deleted.
_security_group_rules = compute_fakes.create_security_group_rules(count=2)
@ -320,26 +315,35 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2):
result = self.cmd.take_action(parsed_args)
sgr_mock.assert_called_once_with(self._security_group_rules[0]['id'])
sgr_mock.assert_called_once_with(
self.compute_sdk_client, self._security_group_rules[0]['id']
)
self.assertIsNone(result)
def test_security_group_rule_delete_multi(self, sgr_mock):
arglist = []
verifylist = []
for s in self._security_group_rules:
arglist.append(s['id'])
arglist = [
self._security_group_rules[0]['id'],
self._security_group_rules[1]['id'],
]
verifylist = [
('rule', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_group_rules:
calls.append(call(s['id']))
sgr_mock.assert_has_calls(calls)
sgr_mock.assert_has_calls(
[
mock.call(
self.compute_sdk_client,
self._security_group_rules[0]['id'],
),
mock.call(
self.compute_sdk_client,
self._security_group_rules[1]['id'],
),
]
)
self.assertIsNone(result)
def test_security_group_rule_delete_multi_with_exception(self, sgr_mock):
@ -348,12 +352,11 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2):
'unexist_rule',
]
verifylist = [
('rule', [self._security_group_rules[0]['id'], 'unexist_rule']),
('rule', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [None, exceptions.CommandError]
sgr_mock.side_effect = find_mock_result
sgr_mock.side_effect = [None, exceptions.NotFound('foo')]
try:
self.cmd.take_action(parsed_args)
@ -361,8 +364,15 @@ class TestDeleteSecurityGroupRuleCompute(compute_fakes.TestComputev2):
except exceptions.CommandError as e:
self.assertEqual('1 of 2 rules failed to delete.', str(e))
sgr_mock.assert_any_call(self._security_group_rules[0]['id'])
sgr_mock.assert_any_call('unexist_rule')
sgr_mock.assert_has_calls(
[
mock.call(
self.compute_sdk_client,
self._security_group_rules[0]['id'],
),
mock.call(self.compute_sdk_client, 'unexist_rule'),
]
)
class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
@ -432,10 +442,10 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
self.app.client_manager.network_endpoint_enabled = False
self.compute_client.api.security_group_find = mock.Mock(
compute_v2.find_security_group = mock.Mock(
return_value=self._security_group,
)
self.compute_client.api.security_group_list = mock.Mock(
compute_v2.list_security_groups = mock.Mock(
return_value=[self._security_group],
)
@ -446,8 +456,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
parsed_args = self.check_parser(self.cmd, [], [])
columns, data = self.cmd.take_action(parsed_args)
self.compute_client.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': False}
compute_v2.list_security_groups.assert_called_once_with(
self.compute_sdk_client, all_projects=False
)
self.assertEqual(self.expected_columns_no_group, columns)
self.assertEqual(self.expected_data_no_group, list(data))
@ -462,8 +472,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.compute_client.api.security_group_find.assert_called_once_with(
self._security_group['id']
compute_v2.find_security_group.assert_called_once_with(
self.compute_sdk_client, self._security_group['id']
)
self.assertEqual(self.expected_columns_with_group, columns)
self.assertEqual(self.expected_data_with_group, list(data))
@ -478,8 +488,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.compute_client.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': True}
compute_v2.list_security_groups.assert_called_once_with(
self.compute_sdk_client, all_projects=True
)
self.assertEqual(self.expected_columns_no_group, columns)
self.assertEqual(self.expected_data_no_group, list(data))
@ -494,8 +504,8 @@ class TestListSecurityGroupRuleCompute(compute_fakes.TestComputev2):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.compute_client.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': False}
compute_v2.list_security_groups.assert_called_once_with(
self.compute_sdk_client, all_projects=False
)
self.assertEqual(self.expected_columns_no_group, columns)
self.assertEqual(self.expected_data_no_group, list(data))
@ -517,7 +527,7 @@ class TestShowSecurityGroupRuleCompute(compute_fakes.TestComputev2):
# Build a security group fake customized for this test.
security_group_rules = [self._security_group_rule]
security_group = {'rules': security_group_rules}
self.compute_client.api.security_group_list = mock.Mock(
compute_v2.list_security_groups = mock.Mock(
return_value=[security_group],
)
@ -540,6 +550,8 @@ class TestShowSecurityGroupRuleCompute(compute_fakes.TestComputev2):
columns, data = self.cmd.take_action(parsed_args)
self.compute_client.api.security_group_list.assert_called_once_with()
compute_v2.list_security_groups.assert_called_once_with(
self.compute_sdk_client
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)