Support Neutron Local IP CRUD
Add support for Neutron Local IP CRUD operations. Partial-Bug: #1930200 Depends-On: https://review.opendev.org/c/openstack/neutron/+/804523 Depends-On: https://review.opendev.org/c/openstack/openstacksdk/+/804988 Change-Id: I1095100efb27b8559412469f0a9d07fc0a3db9d5
This commit is contained in:
parent
f00dce9e2f
commit
26144743d9
11
doc/source/cli/command-objects/local-ip-association.rst
Normal file
11
doc/source/cli/command-objects/local-ip-association.rst
Normal file
@ -0,0 +1,11 @@
|
||||
=============================================
|
||||
Local IP Associations (local_ip_associations)
|
||||
=============================================
|
||||
|
||||
The resource lets users assign Local IPs to user Ports.
|
||||
This is a sub-resource of the Local IP resource.
|
||||
|
||||
Network v2
|
||||
|
||||
.. autoprogram-cliff:: openstack.network.v2
|
||||
:command: local ip association *
|
12
doc/source/cli/command-objects/local-ip.rst
Normal file
12
doc/source/cli/command-objects/local-ip.rst
Normal file
@ -0,0 +1,12 @@
|
||||
=====================
|
||||
Local IPs (local_ips)
|
||||
=====================
|
||||
|
||||
Extension that allows users to create a virtual IP that can later be assigned
|
||||
to multiple ports/VMs (similar to anycast IP) and is guaranteed to only be
|
||||
reachable within the same physical server/node boundaries
|
||||
|
||||
Network v2
|
||||
|
||||
.. autoprogram-cliff:: openstack.network.v2
|
||||
:command: local ip *
|
@ -38,7 +38,7 @@ msgpack-python==0.4.0
|
||||
munch==2.1.0
|
||||
netaddr==0.7.18
|
||||
netifaces==0.10.4
|
||||
openstacksdk==0.56.0
|
||||
openstacksdk==0.61.0
|
||||
os-client-config==2.1.0
|
||||
os-service-types==1.7.0
|
||||
osc-lib==2.3.0
|
||||
|
310
openstackclient/network/v2/local_ip.py
Normal file
310
openstackclient/network/v2/local_ip.py
Normal file
@ -0,0 +1,310 @@
|
||||
# Copyright 2021 Huawei, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Node Local IP action implementations"""
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from openstackclient.i18n import _
|
||||
from openstackclient.identity import common as identity_common
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map = {}
|
||||
hidden_columns = ['location']
|
||||
return utils.get_osc_show_columns_for_sdk_resource(
|
||||
item, column_map, hidden_columns)
|
||||
|
||||
|
||||
def _get_attrs(client_manager, parsed_args):
|
||||
attrs = {}
|
||||
network_client = client_manager.network
|
||||
|
||||
if parsed_args.name:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description:
|
||||
attrs['description'] = parsed_args.description
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
identity_client = client_manager.identity
|
||||
project_id = identity_common.find_project(
|
||||
identity_client,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['project_id'] = project_id
|
||||
if parsed_args.network:
|
||||
network = network_client.find_network(parsed_args.network,
|
||||
ignore_missing=False)
|
||||
attrs['network_id'] = network.id
|
||||
if parsed_args.local_ip_address:
|
||||
attrs['local_ip_address'] = parsed_args.local_ip_address
|
||||
if parsed_args.local_port:
|
||||
port = network_client.find_port(parsed_args.local_port,
|
||||
ignore_missing=False)
|
||||
attrs['local_port_id'] = port.id
|
||||
if parsed_args.ip_mode:
|
||||
attrs['ip_mode'] = parsed_args.ip_mode
|
||||
return attrs
|
||||
|
||||
|
||||
class CreateLocalIP(command.ShowOne):
|
||||
_description = _("Create Local IP")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar="<name>",
|
||||
help=_("New local IP name")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar="<description>",
|
||||
help=_("New local IP description")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--network',
|
||||
metavar='<network>',
|
||||
help=_("Network to allocate Local IP (name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--local-port',
|
||||
metavar='<local_port>',
|
||||
help=_("Port to allocate Local IP (name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
"--local-ip-address",
|
||||
metavar="<local_ip_address>",
|
||||
help=_("IP address or CIDR "),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--ip-mode',
|
||||
metavar='<ip_mode>',
|
||||
help=_("local IP ip mode")
|
||||
)
|
||||
|
||||
identity_common.add_project_domain_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_attrs(self.app.client_manager, parsed_args)
|
||||
|
||||
obj = client.create_local_ip(**attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = utils.get_item_properties(obj, columns, formatters={})
|
||||
|
||||
return (display_columns, data)
|
||||
|
||||
|
||||
class DeleteLocalIP(command.Command):
|
||||
_description = _("Delete local IP(s)")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar="<local-ip>",
|
||||
nargs='+',
|
||||
help=_("Local IP(s) to delete (name or ID)")
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
|
||||
for lip in parsed_args.local_ip:
|
||||
try:
|
||||
obj = client.find_local_ip(lip, ignore_missing=False)
|
||||
client.delete_local_ip(obj)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete Local IP with "
|
||||
"name or ID '%(lip)s': %(e)s"),
|
||||
{'lip': lip, 'e': e})
|
||||
|
||||
if result > 0:
|
||||
total = len(parsed_args.local_ip)
|
||||
msg = (_("%(result)s of %(total)s local IPs failed "
|
||||
"to delete.") % {'result': result, 'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class SetLocalIP(command.Command):
|
||||
_description = _("Set local ip properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar="<local-ip>",
|
||||
help=_("Local IP to modify (name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar="<name>",
|
||||
help=_('Set local IP name')
|
||||
)
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar="<description>",
|
||||
help=_('Set local IP description')
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
obj = client.find_local_ip(
|
||||
parsed_args.local_ip,
|
||||
ignore_missing=False)
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if attrs:
|
||||
client.update_local_ip(obj, **attrs)
|
||||
|
||||
|
||||
class ListLocalIP(command.Lister):
|
||||
_description = _("List local IPs")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar='<name>',
|
||||
help=_("List only local IPs of given name in output")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--project',
|
||||
metavar="<project>",
|
||||
help=_("List Local IPs according to their project "
|
||||
"(name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--network',
|
||||
metavar='<network>',
|
||||
help=_("List Local IP(s) according to "
|
||||
"given network (name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--local-port',
|
||||
metavar='<local_port>',
|
||||
help=_("List Local IP(s) according to "
|
||||
"given port (name or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--local-ip-address',
|
||||
metavar='<local_ip_address>',
|
||||
help=_("List Local IP(s) according to "
|
||||
"given Local IP Address")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--ip-mode',
|
||||
metavar='<ip_mode>',
|
||||
help=_("List Local IP(s) according to "
|
||||
"given IP mode")
|
||||
)
|
||||
|
||||
identity_common.add_project_domain_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'description',
|
||||
'project_id',
|
||||
'local_port_id',
|
||||
'network_id',
|
||||
'local_ip_address',
|
||||
'ip_mode',
|
||||
)
|
||||
column_headers = (
|
||||
'ID',
|
||||
'Name',
|
||||
'Description',
|
||||
'Project',
|
||||
'Local Port ID',
|
||||
'Network',
|
||||
'Local IP address',
|
||||
'IP mode',
|
||||
)
|
||||
attrs = {}
|
||||
if parsed_args.name:
|
||||
attrs['name'] = parsed_args.name
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
identity_client = self.app.client_manager.identity
|
||||
project_id = identity_common.find_project(
|
||||
identity_client,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['project_id'] = project_id
|
||||
if parsed_args.network is not None:
|
||||
network = client.find_network(parsed_args.network,
|
||||
ignore_missing=False)
|
||||
attrs['network_id'] = network.id
|
||||
if parsed_args.local_port:
|
||||
port = client.find_port(parsed_args.local_port,
|
||||
ignore_missing=False)
|
||||
attrs['local_port_id'] = port.id
|
||||
if parsed_args.local_ip_address:
|
||||
attrs['local_ip_address'] = parsed_args.local_ip_address
|
||||
if parsed_args.ip_mode:
|
||||
attrs['ip_mode'] = parsed_args.ip_mode
|
||||
data = client.local_ips(**attrs)
|
||||
|
||||
return (column_headers,
|
||||
(utils.get_item_properties(s,
|
||||
columns,
|
||||
formatters={},) for s in data))
|
||||
|
||||
|
||||
class ShowLocalIP(command.ShowOne):
|
||||
_description = _("Display local IP details")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar="<local-ip>",
|
||||
help=_("Local IP to display (name or ID)")
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
obj = client.find_local_ip(
|
||||
parsed_args.local_ip,
|
||||
ignore_missing=False)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = utils.get_item_properties(obj, columns, formatters={})
|
||||
|
||||
return (display_columns, data)
|
197
openstackclient/network/v2/local_ip_association.py
Normal file
197
openstackclient/network/v2/local_ip_association.py
Normal file
@ -0,0 +1,197 @@
|
||||
# Copyright 2021 Huawei, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Node Local IP action implementations"""
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from openstackclient.i18n import _
|
||||
from openstackclient.identity import common as identity_common
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map = {}
|
||||
hidden_columns = ['location', 'name', 'id']
|
||||
return utils.get_osc_show_columns_for_sdk_resource(
|
||||
item, column_map, hidden_columns)
|
||||
|
||||
|
||||
class CreateLocalIPAssociation(command.ShowOne):
|
||||
_description = _("Create Local IP Association")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar='<local-ip>',
|
||||
help=_("Local IP that the port association belongs to "
|
||||
"(IP address or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'fixed_port',
|
||||
metavar='<fixed_port>',
|
||||
help=_("The ID or Name of Port to allocate Local IP Association")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--fixed-ip',
|
||||
metavar='<fixed_ip>',
|
||||
help=_("Fixed IP for Local IP Association")
|
||||
)
|
||||
|
||||
identity_common.add_project_domain_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
|
||||
attrs = {}
|
||||
port = client.find_port(parsed_args.fixed_port,
|
||||
ignore_missing=False)
|
||||
attrs['fixed_port_id'] = port.id
|
||||
if parsed_args.fixed_ip:
|
||||
attrs['fixed_ip'] = parsed_args.fixed_ip
|
||||
local_ip = client.find_local_ip(
|
||||
parsed_args.local_ip,
|
||||
ignore_missing=False,
|
||||
)
|
||||
obj = client.create_local_ip_association(local_ip.id, **attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = utils.get_item_properties(obj, columns, formatters={})
|
||||
|
||||
return (display_columns, data)
|
||||
|
||||
|
||||
class DeleteLocalIPAssociation(command.Command):
|
||||
_description = _("Delete Local IP association(s)")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar="<local_ip>",
|
||||
help=_("Local IP that the port association belongs to "
|
||||
"(IP address or ID)")
|
||||
)
|
||||
parser.add_argument(
|
||||
'fixed_port_id',
|
||||
nargs="+",
|
||||
metavar="<fixed_port_id>",
|
||||
help=_("The fixed port ID of Local IP Association")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
local_ip = client.find_local_ip(
|
||||
parsed_args.local_ip,
|
||||
ignore_missing=False,
|
||||
)
|
||||
result = 0
|
||||
|
||||
for fixed_port_id in parsed_args.fixed_port_id:
|
||||
try:
|
||||
client.delete_local_ip_association(
|
||||
local_ip.id,
|
||||
fixed_port_id,
|
||||
ignore_missing=False,
|
||||
)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete Local IP Association with "
|
||||
"fixed port "
|
||||
"name or ID '%(fixed_port_id)s': %(e)s"),
|
||||
{'fixed port ID': fixed_port_id, 'e': e})
|
||||
|
||||
if result > 0:
|
||||
total = len(parsed_args.fixed_port_id)
|
||||
msg = (_("%(result)s of %(total)s Local IP Associations failed "
|
||||
"to delete.") % {'result': result, 'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListLocalIPAssociation(command.Lister):
|
||||
_description = _("List Local IP Associations")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
|
||||
parser.add_argument(
|
||||
'local_ip',
|
||||
metavar='<local_ip>',
|
||||
help=_("Local IP that port associations belongs to")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--fixed-port',
|
||||
metavar='<fixed_port>',
|
||||
help=_("Filter the list result by the ID or name of "
|
||||
"the fixed port")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--fixed-ip',
|
||||
metavar='<fixed_ip>',
|
||||
help=_("Filter the list result by fixed ip")
|
||||
)
|
||||
parser.add_argument(
|
||||
'--host',
|
||||
metavar='<host>',
|
||||
help=_("Filter the list result by given host")
|
||||
)
|
||||
identity_common.add_project_domain_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
columns = (
|
||||
'local_ip_id',
|
||||
'local_ip_address',
|
||||
'fixed_port_id',
|
||||
'fixed_ip',
|
||||
'host',
|
||||
)
|
||||
column_headers = (
|
||||
'Local IP ID',
|
||||
'Local IP Address',
|
||||
'Fixed port ID',
|
||||
'Fixed IP',
|
||||
'Host'
|
||||
)
|
||||
attrs = {}
|
||||
obj = client.find_local_ip(
|
||||
parsed_args.local_ip,
|
||||
ignore_missing=False,
|
||||
)
|
||||
if parsed_args.fixed_port:
|
||||
port = client.find_port(parsed_args.fixed_port,
|
||||
ignore_missing=False)
|
||||
attrs['fixed_port_id'] = port.id
|
||||
if parsed_args.fixed_ip:
|
||||
attrs['fixed_ip'] = parsed_args.fixed_ip
|
||||
if parsed_args.host:
|
||||
attrs['host'] = parsed_args.host
|
||||
|
||||
data = client.local_ip_associations(obj, **attrs)
|
||||
|
||||
return (column_headers,
|
||||
(utils.get_item_properties(s,
|
||||
columns,
|
||||
formatters={}) for s in data))
|
161
openstackclient/tests/functional/network/v2/test_local_ip.py
Normal file
161
openstackclient/tests/functional/network/v2/test_local_ip.py
Normal file
@ -0,0 +1,161 @@
|
||||
# Copyright 2021 Huawei, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from openstackclient.tests.functional.network.v2 import common
|
||||
|
||||
|
||||
class LocalIPTests(common.NetworkTests):
|
||||
"""Functional tests for local IP"""
|
||||
|
||||
def setUp(self):
|
||||
super(LocalIPTests, self).setUp()
|
||||
# Nothing in this class works with Nova Network
|
||||
if not self.haz_network:
|
||||
self.skipTest("No Network service present")
|
||||
if not self.is_extension_enabled('local-ip'):
|
||||
self.skipTest("No local-ip extension present")
|
||||
|
||||
def test_local_ip_create_and_delete(self):
|
||||
"""Test create, delete multiple"""
|
||||
name1 = uuid.uuid4().hex
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip create -f json ' +
|
||||
name1
|
||||
))
|
||||
self.assertEqual(
|
||||
name1,
|
||||
cmd_output['name'],
|
||||
)
|
||||
|
||||
name2 = uuid.uuid4().hex
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip create -f json ' +
|
||||
name2
|
||||
))
|
||||
self.assertEqual(
|
||||
name2,
|
||||
cmd_output['name'],
|
||||
)
|
||||
|
||||
raw_output = self.openstack(
|
||||
'local ip delete ' + name1 + ' ' + name2,
|
||||
)
|
||||
self.assertOutput('', raw_output)
|
||||
|
||||
def test_local_ip_list(self):
|
||||
"""Test create, list filters, delete"""
|
||||
# Get project IDs
|
||||
cmd_output = json.loads(self.openstack('token issue -f json '))
|
||||
auth_project_id = cmd_output['project_id']
|
||||
|
||||
cmd_output = json.loads(self.openstack('project list -f json '))
|
||||
admin_project_id = None
|
||||
demo_project_id = None
|
||||
for p in cmd_output:
|
||||
if p['Name'] == 'admin':
|
||||
admin_project_id = p['ID']
|
||||
if p['Name'] == 'demo':
|
||||
demo_project_id = p['ID']
|
||||
|
||||
# Verify assumptions:
|
||||
# * admin and demo projects are present
|
||||
# * demo and admin are distinct projects
|
||||
# * tests run as admin
|
||||
self.assertIsNotNone(admin_project_id)
|
||||
self.assertIsNotNone(demo_project_id)
|
||||
self.assertNotEqual(admin_project_id, demo_project_id)
|
||||
self.assertEqual(admin_project_id, auth_project_id)
|
||||
|
||||
name1 = uuid.uuid4().hex
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip create -f json ' +
|
||||
name1
|
||||
))
|
||||
self.addCleanup(self.openstack, 'local ip delete ' + name1)
|
||||
self.assertEqual(
|
||||
admin_project_id,
|
||||
cmd_output["project_id"],
|
||||
)
|
||||
|
||||
name2 = uuid.uuid4().hex
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip create -f json ' +
|
||||
'--project ' + demo_project_id +
|
||||
' ' + name2
|
||||
))
|
||||
self.addCleanup(self.openstack, 'local ip delete ' + name2)
|
||||
self.assertEqual(
|
||||
demo_project_id,
|
||||
cmd_output["project_id"],
|
||||
)
|
||||
|
||||
# Test list
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip list -f json ',
|
||||
))
|
||||
names = [x["Name"] for x in cmd_output]
|
||||
self.assertIn(name1, names)
|
||||
self.assertIn(name2, names)
|
||||
|
||||
# Test list --project
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip list -f json ' +
|
||||
'--project ' + demo_project_id
|
||||
))
|
||||
names = [x["Name"] for x in cmd_output]
|
||||
self.assertNotIn(name1, names)
|
||||
self.assertIn(name2, names)
|
||||
|
||||
# Test list --name
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip list -f json ' +
|
||||
'--name ' + name1
|
||||
))
|
||||
names = [x["Name"] for x in cmd_output]
|
||||
self.assertIn(name1, names)
|
||||
self.assertNotIn(name2, names)
|
||||
|
||||
def test_local_ip_set_unset_and_show(self):
|
||||
"""Tests create options, set, and show"""
|
||||
name = uuid.uuid4().hex
|
||||
newname = name + "_"
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip create -f json ' +
|
||||
'--description aaaa ' +
|
||||
name
|
||||
))
|
||||
self.addCleanup(self.openstack, 'local ip delete ' + newname)
|
||||
self.assertEqual(name, cmd_output['name'])
|
||||
self.assertEqual('aaaa', cmd_output['description'])
|
||||
|
||||
# Test set name and description
|
||||
raw_output = self.openstack(
|
||||
'local ip set ' +
|
||||
'--name ' + newname + ' ' +
|
||||
'--description bbbb ' +
|
||||
name,
|
||||
)
|
||||
self.assertOutput('', raw_output)
|
||||
|
||||
# Show the updated local ip
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'local ip show -f json ' +
|
||||
newname,
|
||||
))
|
||||
self.assertEqual(newname, cmd_output['name'])
|
||||
self.assertEqual('bbbb', cmd_output['description'])
|
@ -18,6 +18,9 @@ from random import randint
|
||||
from unittest import mock
|
||||
import uuid
|
||||
|
||||
from openstack.network.v2 import local_ip as _local_ip
|
||||
from openstack.network.v2 import local_ip_association as _local_ip_association
|
||||
|
||||
from openstackclient.tests.unit import fakes
|
||||
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
|
||||
from openstackclient.tests.unit import utils
|
||||
@ -2033,3 +2036,138 @@ class FakeL3ConntrackHelper(object):
|
||||
)
|
||||
|
||||
return mock.Mock(side_effect=ct_helpers)
|
||||
|
||||
|
||||
def create_one_local_ip(attrs=None):
|
||||
"""Create a fake local ip.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A FakeResource object with name, id, etc.
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
local_ip_attrs = {
|
||||
'created_at': '2021-11-29T10:10:23.000000',
|
||||
'name': 'local-ip-name-' + uuid.uuid4().hex,
|
||||
'description': 'local-ip-description-' + uuid.uuid4().hex,
|
||||
'id': 'local-ip-id-' + uuid.uuid4().hex,
|
||||
'project_id': 'project-id-' + uuid.uuid4().hex,
|
||||
'local_port_id': 'local_port_id-' + uuid.uuid4().hex,
|
||||
'network_id': 'network_id-' + uuid.uuid4().hex,
|
||||
'local_ip_address': '10.0.0.1',
|
||||
'ip_mode': 'translate',
|
||||
'revision_number': 'local-ip-revision-number-' + uuid.uuid4().hex,
|
||||
'updated_at': '2021-11-29T10:10:25.000000',
|
||||
}
|
||||
|
||||
# Overwrite default attributes.
|
||||
local_ip_attrs.update(attrs)
|
||||
|
||||
local_ip = _local_ip.LocalIP(**local_ip_attrs)
|
||||
|
||||
return local_ip
|
||||
|
||||
|
||||
def create_local_ips(attrs=None, count=2):
|
||||
"""Create multiple fake local ips.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of local ips to fake
|
||||
:return:
|
||||
A list of FakeResource objects faking the local ips
|
||||
"""
|
||||
local_ips = []
|
||||
for i in range(0, count):
|
||||
local_ips.append(create_one_local_ip(attrs))
|
||||
|
||||
return local_ips
|
||||
|
||||
|
||||
def get_local_ips(local_ips=None, count=2):
|
||||
"""Get an iterable Mock object with a list of faked local ips.
|
||||
|
||||
If local ip list is provided, then initialize the Mock object
|
||||
with the list. Otherwise create one.
|
||||
|
||||
:param List local_ips:
|
||||
A list of FakeResource objects faking local ips
|
||||
:param int count:
|
||||
The number of local ips to fake
|
||||
:return:
|
||||
An iterable Mock object with side_effect set to a list of faked
|
||||
local ips
|
||||
"""
|
||||
if local_ips is None:
|
||||
local_ips = create_local_ips(count)
|
||||
return mock.Mock(side_effect=local_ips)
|
||||
|
||||
|
||||
def create_one_local_ip_association(attrs=None):
|
||||
"""Create a fake local ip association.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A FakeResource object with local_ip_id, local_ip_address, etc.
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
local_ip_association_attrs = {
|
||||
'local_ip_id': 'local-ip-id-' + uuid.uuid4().hex,
|
||||
'local_ip_address': '172.24.4.228',
|
||||
'fixed_port_id': 'fixed-port-id-' + uuid.uuid4().hex,
|
||||
'fixed_ip': '10.0.0.5',
|
||||
'host': 'host-' + uuid.uuid4().hex,
|
||||
}
|
||||
|
||||
# Overwrite default attributes.
|
||||
local_ip_association_attrs.update(attrs)
|
||||
|
||||
local_ip_association = (
|
||||
_local_ip_association.LocalIPAssociation(
|
||||
**local_ip_association_attrs))
|
||||
|
||||
return local_ip_association
|
||||
|
||||
|
||||
def create_local_ip_associations(attrs=None, count=2):
|
||||
"""Create multiple fake local ip associations.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of address groups to fake
|
||||
:return:
|
||||
A list of FakeResource objects faking the local ip associations
|
||||
"""
|
||||
local_ip_associations = []
|
||||
for i in range(0, count):
|
||||
local_ip_associations.append(create_one_local_ip_association(attrs))
|
||||
|
||||
return local_ip_associations
|
||||
|
||||
|
||||
def get_local_ip_associations(local_ip_associations=None, count=2):
|
||||
"""Get a list of faked local ip associations
|
||||
|
||||
If local ip association list is provided, then initialize
|
||||
the Mock object with the list. Otherwise create one.
|
||||
|
||||
:param List local_ip_associations:
|
||||
A list of FakeResource objects faking local ip associations
|
||||
:param int count:
|
||||
The number of local ip associations to fake
|
||||
:return:
|
||||
An iterable Mock object with side_effect set to a list of faked
|
||||
local ip associations
|
||||
"""
|
||||
if local_ip_associations is None:
|
||||
local_ip_associations = create_local_ip_associations(count)
|
||||
|
||||
return mock.Mock(side_effect=local_ip_associations)
|
||||
|
480
openstackclient/tests/unit/network/v2/test_local_ip.py
Normal file
480
openstackclient/tests/unit/network/v2/test_local_ip.py
Normal file
@ -0,0 +1,480 @@
|
||||
# Copyright 2021 Huawei, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from unittest import mock
|
||||
from unittest.mock import call
|
||||
|
||||
from osc_lib import exceptions
|
||||
|
||||
from openstackclient.network.v2 import local_ip
|
||||
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
|
||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||
from openstackclient.tests.unit import utils as tests_utils
|
||||
|
||||
|
||||
class TestLocalIP(network_fakes.TestNetworkV2):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
# Get a shortcut to the network client
|
||||
self.network = self.app.client_manager.network
|
||||
# Get a shortcut to the ProjectManager Mock
|
||||
self.projects_mock = self.app.client_manager.identity.projects
|
||||
# Get a shortcut to the DomainManager Mock
|
||||
self.domains_mock = self.app.client_manager.identity.domains
|
||||
|
||||
|
||||
class TestCreateLocalIP(TestLocalIP):
|
||||
project = identity_fakes_v3.FakeProject.create_one_project()
|
||||
domain = identity_fakes_v3.FakeDomain.create_one_domain()
|
||||
local_ip_network = network_fakes.FakeNetwork.create_one_network()
|
||||
port = network_fakes.FakePort.create_one_port()
|
||||
# The new local ip created.
|
||||
new_local_ip = network_fakes.create_one_local_ip(
|
||||
attrs={'project_id': project.id,
|
||||
'network_id': local_ip_network.id,
|
||||
'local_port_id': port.id})
|
||||
|
||||
columns = (
|
||||
'created_at',
|
||||
'description',
|
||||
'id',
|
||||
'name',
|
||||
'project_id',
|
||||
'local_port_id',
|
||||
'network_id',
|
||||
'local_ip_address',
|
||||
'ip_mode',
|
||||
'revision_number',
|
||||
'updated_at',
|
||||
)
|
||||
data = (
|
||||
new_local_ip.created_at,
|
||||
new_local_ip.description,
|
||||
new_local_ip.id,
|
||||
new_local_ip.name,
|
||||
new_local_ip.project_id,
|
||||
new_local_ip.local_port_id,
|
||||
new_local_ip.network_id,
|
||||
new_local_ip.local_ip_address,
|
||||
new_local_ip.ip_mode,
|
||||
new_local_ip.revision_number,
|
||||
new_local_ip.updated_at,
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network.create_local_ip = mock.Mock(
|
||||
return_value=self.new_local_ip)
|
||||
self.network.find_network = mock.Mock(
|
||||
return_value=self.local_ip_network)
|
||||
self.network.find_port = mock.Mock(
|
||||
return_value=self.port)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip.CreateLocalIP(self.app, self.namespace)
|
||||
|
||||
self.projects_mock.get.return_value = self.project
|
||||
self.domains_mock.get.return_value = self.domain
|
||||
|
||||
def test_create_no_options(self):
|
||||
parsed_args = self.check_parser(self.cmd, [], [])
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.network.create_local_ip.assert_called_once_with(**{})
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertItemsEqual(self.data, data)
|
||||
|
||||
def test_create_all_options(self):
|
||||
arglist = [
|
||||
'--project-domain', self.domain.name,
|
||||
'--description', self.new_local_ip.description,
|
||||
'--name', self.new_local_ip.name,
|
||||
'--network', self.new_local_ip.network_id,
|
||||
'--local-port', self.new_local_ip.local_port_id,
|
||||
'--local-ip-address', '10.0.0.1',
|
||||
'--ip-mode', self.new_local_ip.ip_mode,
|
||||
]
|
||||
verifylist = [
|
||||
('project_domain', self.domain.name),
|
||||
('description', self.new_local_ip.description),
|
||||
('name', self.new_local_ip.name),
|
||||
('network', self.new_local_ip.network_id),
|
||||
('local_port', self.new_local_ip.local_port_id),
|
||||
('local_ip_address', '10.0.0.1'),
|
||||
('ip_mode', self.new_local_ip.ip_mode),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.network.create_local_ip.assert_called_once_with(**{
|
||||
'name': self.new_local_ip.name,
|
||||
'description': self.new_local_ip.description,
|
||||
'network_id': self.new_local_ip.network_id,
|
||||
'local_port_id': self.new_local_ip.local_port_id,
|
||||
'local_ip_address': '10.0.0.1',
|
||||
'ip_mode': self.new_local_ip.ip_mode,
|
||||
})
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertItemsEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteLocalIP(TestLocalIP):
|
||||
# The local ip to delete.
|
||||
_local_ips = network_fakes.create_local_ips(count=2)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network.delete_local_ip = mock.Mock(return_value=None)
|
||||
self.network.find_local_ip = network_fakes.get_local_ips(
|
||||
local_ips=self._local_ips)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip.DeleteLocalIP(self.app, self.namespace)
|
||||
|
||||
def test_local_ip_delete(self):
|
||||
arglist = [
|
||||
self._local_ips[0].name,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', [self._local_ips[0].name]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
self.network.find_local_ip.assert_called_once_with(
|
||||
self._local_ips[0].name, ignore_missing=False)
|
||||
self.network.delete_local_ip.assert_called_once_with(
|
||||
self._local_ips[0])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multi_local_ips_delete(self):
|
||||
arglist = []
|
||||
|
||||
for a in self._local_ips:
|
||||
arglist.append(a.name)
|
||||
verifylist = [
|
||||
('local_ip', arglist),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
calls = []
|
||||
for a in self._local_ips:
|
||||
calls.append(call(a))
|
||||
self.network.delete_local_ip.assert_has_calls(calls)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multi_local_ips_delete_with_exception(self):
|
||||
arglist = [
|
||||
self._local_ips[0].name,
|
||||
'unexist_local_ip',
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip',
|
||||
[self._local_ips[0].name, 'unexist_local_ip']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
find_mock_result = [self._local_ips[0], exceptions.CommandError]
|
||||
self.network.find_local_ip = (
|
||||
mock.Mock(side_effect=find_mock_result)
|
||||
)
|
||||
|
||||
try:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.fail('CommandError should be raised.')
|
||||
except exceptions.CommandError as e:
|
||||
self.assertEqual('1 of 2 local IPs failed to delete.', str(e))
|
||||
|
||||
self.network.find_local_ip.assert_any_call(
|
||||
self._local_ips[0].name, ignore_missing=False)
|
||||
self.network.find_local_ip.assert_any_call(
|
||||
'unexist_local_ip', ignore_missing=False)
|
||||
self.network.delete_local_ip.assert_called_once_with(
|
||||
self._local_ips[0]
|
||||
)
|
||||
|
||||
|
||||
class TestListLocalIP(TestLocalIP):
|
||||
# The local ip to list up.
|
||||
local_ips = (
|
||||
network_fakes.create_local_ips(count=3))
|
||||
fake_network = network_fakes.FakeNetwork.create_one_network(
|
||||
{'id': 'fake_network_id'}
|
||||
)
|
||||
|
||||
columns = (
|
||||
'ID',
|
||||
'Name',
|
||||
'Description',
|
||||
'Project',
|
||||
'Local Port ID',
|
||||
'Network',
|
||||
'Local IP address',
|
||||
'IP mode',
|
||||
)
|
||||
data = []
|
||||
for lip in local_ips:
|
||||
data.append((
|
||||
lip.id,
|
||||
lip.name,
|
||||
lip.description,
|
||||
lip.project_id,
|
||||
lip.local_port_id,
|
||||
lip.network_id,
|
||||
lip.local_ip_address,
|
||||
lip.ip_mode,
|
||||
))
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network.local_ips = mock.Mock(
|
||||
return_value=self.local_ips)
|
||||
self.network.find_network = mock.Mock(
|
||||
return_value=self.fake_network
|
||||
)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip.ListLocalIP(self.app, self.namespace)
|
||||
|
||||
def test_local_ip_list(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(**{})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemsEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_list_name(self):
|
||||
arglist = [
|
||||
'--name', self.local_ips[0].name,
|
||||
]
|
||||
verifylist = [
|
||||
('name', self.local_ips[0].name),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(
|
||||
**{'name': self.local_ips[0].name})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemsEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_list_project(self):
|
||||
project = identity_fakes_v3.FakeProject.create_one_project()
|
||||
self.projects_mock.get.return_value = project
|
||||
arglist = [
|
||||
'--project', project.id,
|
||||
]
|
||||
verifylist = [
|
||||
('project', project.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(
|
||||
**{'project_id': project.id})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemsEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_project_domain(self):
|
||||
project = identity_fakes_v3.FakeProject.create_one_project()
|
||||
self.projects_mock.get.return_value = project
|
||||
arglist = [
|
||||
'--project', project.id,
|
||||
'--project-domain', project.domain_id,
|
||||
]
|
||||
verifylist = [
|
||||
('project', project.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
filters = {'project_id': project.id}
|
||||
|
||||
self.network.local_ips.assert_called_once_with(**filters)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemsEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_list_network(self):
|
||||
arglist = [
|
||||
'--network', 'fake_network_id',
|
||||
]
|
||||
verifylist = [
|
||||
('network', 'fake_network_id'),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(**{
|
||||
'network_id': 'fake_network_id',
|
||||
})
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_list_local_ip_address(self):
|
||||
arglist = [
|
||||
'--local-ip-address', self.local_ips[0].local_ip_address,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip_address', self.local_ips[0].local_ip_address),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(**{
|
||||
'local_ip_address': self.local_ips[0].local_ip_address,
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, list(data))
|
||||
|
||||
def test_local_ip_list_ip_mode(self):
|
||||
arglist = [
|
||||
'--ip-mode', self.local_ips[0].ip_mode,
|
||||
]
|
||||
verifylist = [
|
||||
('ip_mode', self.local_ips[0].ip_mode),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ips.assert_called_once_with(**{
|
||||
'ip_mode': self.local_ips[0].ip_mode,
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, list(data))
|
||||
|
||||
|
||||
class TestSetLocalIP(TestLocalIP):
|
||||
# The local ip to set.
|
||||
_local_ip = network_fakes.create_one_local_ip()
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network.update_local_ip = mock.Mock(return_value=None)
|
||||
self.network.find_local_ip = mock.Mock(
|
||||
return_value=self._local_ip)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip.SetLocalIP(self.app, self.namespace)
|
||||
|
||||
def test_set_nothing(self):
|
||||
arglist = [self._local_ip.name, ]
|
||||
verifylist = [
|
||||
('local_ip', self._local_ip.name),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.update_local_ip.assert_not_called()
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_name_and_description(self):
|
||||
arglist = [
|
||||
'--name', 'new_local_ip_name',
|
||||
'--description', 'new_local_ip_description',
|
||||
self._local_ip.name,
|
||||
]
|
||||
verifylist = [
|
||||
('name', 'new_local_ip_name'),
|
||||
('description', 'new_local_ip_description'),
|
||||
('local_ip', self._local_ip.name),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {
|
||||
'name': "new_local_ip_name",
|
||||
'description': 'new_local_ip_description',
|
||||
}
|
||||
self.network.update_local_ip.assert_called_with(
|
||||
self._local_ip, **attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowLocalIP(TestLocalIP):
|
||||
# The local ip to show.
|
||||
_local_ip = network_fakes.create_one_local_ip()
|
||||
columns = (
|
||||
'created_at',
|
||||
'description',
|
||||
'id',
|
||||
'name',
|
||||
'project_id',
|
||||
'local_port_id',
|
||||
'network_id',
|
||||
'local_ip_address',
|
||||
'ip_mode',
|
||||
'revision_number',
|
||||
'updated_at',
|
||||
)
|
||||
data = (
|
||||
_local_ip.created_at,
|
||||
_local_ip.description,
|
||||
_local_ip.id,
|
||||
_local_ip.name,
|
||||
_local_ip.project_id,
|
||||
_local_ip.local_port_id,
|
||||
_local_ip.network_id,
|
||||
_local_ip.local_ip_address,
|
||||
_local_ip.ip_mode,
|
||||
_local_ip.revision_number,
|
||||
_local_ip.updated_at,
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network.find_local_ip = mock.Mock(
|
||||
return_value=self._local_ip)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip.ShowLocalIP(self.app, self.namespace)
|
||||
|
||||
def test_show_no_options(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
|
||||
# Missing required args should bail here
|
||||
self.assertRaises(tests_utils.ParserException, self.check_parser,
|
||||
self.cmd, arglist, verifylist)
|
||||
|
||||
def test_show_all_options(self):
|
||||
arglist = [
|
||||
self._local_ip.name,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self._local_ip.name),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.find_local_ip.assert_called_once_with(
|
||||
self._local_ip.name, ignore_missing=False)
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertItemsEqual(self.data, list(data))
|
@ -0,0 +1,328 @@
|
||||
# Copyright 2021 Huawei, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from unittest import mock
|
||||
from unittest.mock import call
|
||||
|
||||
from osc_lib import exceptions
|
||||
|
||||
from openstackclient.network.v2 import local_ip_association
|
||||
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes_v2
|
||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||
|
||||
|
||||
class TestLocalIPAssociation(network_fakes.TestNetworkV2):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.network = self.app.client_manager.network
|
||||
self.local_ip = network_fakes.create_one_local_ip()
|
||||
self.fixed_port = network_fakes.FakePort.create_one_port()
|
||||
self.project = identity_fakes_v2.FakeProject.create_one_project()
|
||||
self.network.find_port = mock.Mock(return_value=self.fixed_port)
|
||||
|
||||
|
||||
class TestCreateLocalIPAssociation(TestLocalIPAssociation):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.new_local_ip_association = (
|
||||
network_fakes.create_one_local_ip_association(
|
||||
attrs={
|
||||
'fixed_port_id': self.fixed_port.id,
|
||||
'local_ip_id': self.local_ip.id,
|
||||
}
|
||||
)
|
||||
)
|
||||
self.network.create_local_ip_association = mock.Mock(
|
||||
return_value=self.new_local_ip_association)
|
||||
|
||||
self.network.find_local_ip = mock.Mock(
|
||||
return_value=self.local_ip
|
||||
)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip_association.CreateLocalIPAssociation(
|
||||
self.app, self.namespace)
|
||||
|
||||
self.columns = (
|
||||
'local_ip_address',
|
||||
'fixed_port_id',
|
||||
'fixed_ip',
|
||||
'host',
|
||||
)
|
||||
|
||||
self.data = (
|
||||
self.new_local_ip_association.local_ip_address,
|
||||
self.new_local_ip_association.fixed_port_id,
|
||||
self.new_local_ip_association.fixed_ip,
|
||||
self.new_local_ip_association.host,
|
||||
)
|
||||
|
||||
def test_create_no_options(self):
|
||||
arglist = [
|
||||
self.new_local_ip_association.local_ip_id,
|
||||
self.new_local_ip_association.fixed_port_id,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self.new_local_ip_association.local_ip_id),
|
||||
('fixed_port', self.new_local_ip_association.fixed_port_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.create_local_ip_association.\
|
||||
assert_called_once_with(
|
||||
self.new_local_ip_association.local_ip_id,
|
||||
**{
|
||||
'fixed_port_id':
|
||||
self.new_local_ip_association.fixed_port_id,
|
||||
})
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertEqual(set(self.data), set(data))
|
||||
|
||||
def test_create_all_options(self):
|
||||
arglist = [
|
||||
self.new_local_ip_association.local_ip_id,
|
||||
self.new_local_ip_association.fixed_port_id,
|
||||
'--fixed-ip', self.new_local_ip_association.fixed_ip,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self.new_local_ip_association.local_ip_id),
|
||||
('fixed_port', self.new_local_ip_association.fixed_port_id),
|
||||
('fixed_ip', self.new_local_ip_association.fixed_ip),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.create_local_ip_association.\
|
||||
assert_called_once_with(
|
||||
self.new_local_ip_association.local_ip_id,
|
||||
**{
|
||||
'fixed_port_id':
|
||||
self.new_local_ip_association.fixed_port_id,
|
||||
'fixed_ip':
|
||||
self.new_local_ip_association.fixed_ip,
|
||||
})
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertEqual(set(self.data), set(data))
|
||||
|
||||
|
||||
class TestDeleteLocalIPAssociation(TestLocalIPAssociation):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self._local_ip_association = (
|
||||
network_fakes.create_local_ip_associations(
|
||||
count=2, attrs={
|
||||
'local_ip_id': self.local_ip.id,
|
||||
}
|
||||
)
|
||||
)
|
||||
self.network.delete_local_ip_association = mock.Mock(
|
||||
return_value=None
|
||||
)
|
||||
|
||||
self.network.find_local_ip = mock.Mock(
|
||||
return_value=self.local_ip
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip_association.DeleteLocalIPAssociation(
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_local_ip_association_delete(self):
|
||||
arglist = [
|
||||
self.local_ip.id,
|
||||
self._local_ip_association[0].fixed_port_id,
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self.local_ip.id),
|
||||
('fixed_port_id', [self._local_ip_association[0].fixed_port_id]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.delete_local_ip_association.\
|
||||
assert_called_once_with(
|
||||
self.local_ip.id,
|
||||
self._local_ip_association[0].fixed_port_id,
|
||||
ignore_missing=False
|
||||
)
|
||||
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multi_local_ip_associations_delete(self):
|
||||
arglist = []
|
||||
fixed_port_id = []
|
||||
|
||||
arglist.append(str(self.local_ip))
|
||||
|
||||
for a in self._local_ip_association:
|
||||
arglist.append(a.fixed_port_id)
|
||||
fixed_port_id.append(a.fixed_port_id)
|
||||
|
||||
verifylist = [
|
||||
('local_ip', str(self.local_ip)),
|
||||
('fixed_port_id', fixed_port_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
calls = []
|
||||
for a in self._local_ip_association:
|
||||
calls.append(call(a.local_ip_id, a.fixed_port_id,
|
||||
ignore_missing=False))
|
||||
|
||||
self.network.delete_local_ip_association.assert_has_calls(calls)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multi_local_ip_association_delete_with_exception(self):
|
||||
arglist = [
|
||||
self.local_ip.id,
|
||||
self._local_ip_association[0].fixed_port_id,
|
||||
'unexist_fixed_port_id',
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self.local_ip.id),
|
||||
('fixed_port_id',
|
||||
[self._local_ip_association[0].fixed_port_id,
|
||||
'unexist_fixed_port_id']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
delete_mock_result = [None, exceptions.CommandError]
|
||||
|
||||
self.network.delete_local_ip_association = (
|
||||
mock.MagicMock(side_effect=delete_mock_result)
|
||||
)
|
||||
|
||||
try:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.fail('CommandError should be raised.')
|
||||
except exceptions.CommandError as e:
|
||||
self.assertEqual(
|
||||
'1 of 2 Local IP Associations failed to delete.',
|
||||
str(e)
|
||||
)
|
||||
|
||||
self.network.delete_local_ip_association.\
|
||||
assert_any_call(
|
||||
self.local_ip.id,
|
||||
'unexist_fixed_port_id',
|
||||
ignore_missing=False
|
||||
)
|
||||
self.network.delete_local_ip_association.\
|
||||
assert_any_call(
|
||||
self.local_ip.id,
|
||||
self._local_ip_association[0].fixed_port_id,
|
||||
ignore_missing=False
|
||||
)
|
||||
|
||||
|
||||
class TestListLocalIPAssociation(TestLocalIPAssociation):
|
||||
|
||||
columns = (
|
||||
'Local IP ID',
|
||||
'Local IP Address',
|
||||
'Fixed port ID',
|
||||
'Fixed IP',
|
||||
'Host'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.local_ip_associations = (
|
||||
network_fakes.create_local_ip_associations(
|
||||
count=3, attrs={
|
||||
'local_ip_id': self.local_ip.id,
|
||||
'fixed_port_id': self.fixed_port.id,
|
||||
}
|
||||
)
|
||||
)
|
||||
self.data = []
|
||||
for lip_assoc in self.local_ip_associations:
|
||||
self.data.append((
|
||||
lip_assoc.local_ip_id,
|
||||
lip_assoc.local_ip_address,
|
||||
lip_assoc.fixed_port_id,
|
||||
lip_assoc.fixed_ip,
|
||||
lip_assoc.host,
|
||||
))
|
||||
self.network.local_ip_associations = mock.Mock(
|
||||
return_value=self.local_ip_associations
|
||||
)
|
||||
self.network.find_local_ip = mock.Mock(
|
||||
return_value=self.local_ip
|
||||
)
|
||||
self.network.find_port = mock.Mock(
|
||||
return_value=self.fixed_port
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = local_ip_association.ListLocalIPAssociation(
|
||||
self.app,
|
||||
self.namespace
|
||||
)
|
||||
|
||||
def test_local_ip_association_list(self):
|
||||
arglist = [
|
||||
self.local_ip.id
|
||||
]
|
||||
verifylist = [
|
||||
('local_ip', self.local_ip.id)
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.local_ip_associations.assert_called_once_with(
|
||||
self.local_ip,
|
||||
**{}
|
||||
)
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertEqual(set(self.data), set(list(data)))
|
||||
|
||||
def test_local_ip_association_list_all_options(self):
|
||||
arglist = [
|
||||
'--fixed-port', self.local_ip_associations[0].fixed_port_id,
|
||||
'--fixed-ip', self.local_ip_associations[0].fixed_ip,
|
||||
'--host', self.local_ip_associations[0].host,
|
||||
self.local_ip_associations[0].local_ip_id
|
||||
]
|
||||
|
||||
verifylist = [
|
||||
('fixed_port', self.local_ip_associations[0].fixed_port_id),
|
||||
('fixed_ip', self.local_ip_associations[0].fixed_ip),
|
||||
('host', self.local_ip_associations[0].host),
|
||||
('local_ip', self.local_ip_associations[0].local_ip_id),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {
|
||||
'fixed_port_id': self.local_ip_associations[0].fixed_port_id,
|
||||
'fixed_ip': self.local_ip_associations[0].fixed_ip,
|
||||
'host': self.local_ip_associations[0].host,
|
||||
}
|
||||
|
||||
self.network.local_ip_associations.assert_called_once_with(
|
||||
self.local_ip,
|
||||
**attrs
|
||||
)
|
||||
self.assertEqual(set(self.columns), set(columns))
|
||||
self.assertEqual(set(self.data), set(list(data)))
|
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- Add ``local ip create``, ``local ip delete``,
|
||||
``local ip list``, ``local ip set``, ``local ip show``,
|
||||
``local ip association create``, ``local ip association delete``
|
||||
and ``local ip association list`` commands to support Neutron Local IP
|
||||
CRUD operations.
|
||||
[`bug 1930200 <https://bugs.launchpad.net/neutron/+bug/1930200>`_]
|
@ -5,7 +5,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
|
||||
cliff>=3.5.0 # Apache-2.0
|
||||
iso8601>=0.1.11 # MIT
|
||||
openstacksdk>=0.56.0 # Apache-2.0
|
||||
openstacksdk>=0.61.0 # Apache-2.0
|
||||
osc-lib>=2.3.0 # Apache-2.0
|
||||
oslo.i18n>=3.15.3 # Apache-2.0
|
||||
oslo.utils>=3.33.0 # Apache-2.0
|
||||
|
10
setup.cfg
10
setup.cfg
@ -415,6 +415,16 @@ openstack.network.v2 =
|
||||
ip_availability_list = openstackclient.network.v2.ip_availability:ListIPAvailability
|
||||
ip_availability_show = openstackclient.network.v2.ip_availability:ShowIPAvailability
|
||||
|
||||
local_ip_create = openstackclient.network.v2.local_ip:CreateLocalIP
|
||||
local_ip_delete = openstackclient.network.v2.local_ip:DeleteLocalIP
|
||||
local_ip_list = openstackclient.network.v2.local_ip:ListLocalIP
|
||||
local_ip_set = openstackclient.network.v2.local_ip:SetLocalIP
|
||||
local_ip_show = openstackclient.network.v2.local_ip:ShowLocalIP
|
||||
|
||||
local_ip_association_create = openstackclient.network.v2.local_ip_association:CreateLocalIPAssociation
|
||||
local_ip_association_delete = openstackclient.network.v2.local_ip_association:DeleteLocalIPAssociation
|
||||
local_ip_association_list = openstackclient.network.v2.local_ip_association:ListLocalIPAssociation
|
||||
|
||||
network_agent_add_network = openstackclient.network.v2.network_agent:AddNetworkToAgent
|
||||
network_agent_add_router = openstackclient.network.v2.network_agent:AddRouterToAgent
|
||||
network_agent_delete = openstackclient.network.v2.network_agent:DeleteNetworkAgent
|
||||
|
Loading…
x
Reference in New Issue
Block a user