Merge "Support bulk deletion for commands that exist in both network and compute."

This commit is contained in:
Jenkins 2016-06-15 16:54:10 +00:00 committed by Gerrit Code Review
commit d3cd322650
12 changed files with 459 additions and 72 deletions

View File

@ -66,16 +66,17 @@ Create new floating IP address
ip floating delete
------------------
Delete floating IP
Delete floating IP(s)
.. program:: ip floating delete
.. code:: bash
os ip floating delete <floating-ip>
os ip floating delete
<floating-ip> [<floating-ip> ...]
.. describe:: <floating-ip>
Floating IP to delete (IP address or ID)
Floating IP(s) to delete (IP address or ID)
ip floating list
----------------

View File

@ -104,17 +104,17 @@ Create a new security group rule
security group rule delete
--------------------------
Delete a security group rule
Delete security group rule(s)
.. program:: security group rule delete
.. code:: bash
os security group rule delete
<rule>
<rule> [<rule> ...]
.. describe:: <rule>
Security group rule to delete (ID only)
Security group rule(s) to delete (ID only)
security group rule list
------------------------

View File

@ -45,17 +45,17 @@ Create a new security group
security group delete
---------------------
Delete a security group
Delete security group(s)
.. program:: security group delete
.. code:: bash
os security group delete
<group>
<group> [<group> ...]
.. describe:: <group>
Security group to delete (name or ID)
Security group(s) to delete (name or ID)
security group list
-------------------

View File

@ -110,26 +110,28 @@ class CreateFloatingIP(common.NetworkAndComputeShowOne):
return (columns, data)
class DeleteFloatingIP(common.NetworkAndComputeCommand):
"""Delete floating IP"""
class DeleteFloatingIP(common.NetworkAndComputeDelete):
"""Delete floating IP(s)"""
# Used by base class to find resources in parsed_args.
resource = 'floating_ip'
r = None
def update_parser_common(self, parser):
parser.add_argument(
'floating_ip',
metavar="<floating-ip>",
help=_("Floating IP to delete (IP address or ID)")
nargs="+",
help=_("Floating IP(s) to delete (IP address or ID)")
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_ip(parsed_args.floating_ip)
obj = client.find_ip(self.r, ignore_missing=False)
client.delete_ip(obj)
def take_action_compute(self, client, parsed_args):
obj = utils.find_resource(
client.floating_ips,
parsed_args.floating_ip,
)
obj = utils.find_resource(client.floating_ips, self.r)
client.floating_ips.delete(obj.id)

View File

@ -164,26 +164,28 @@ class CreateSecurityGroup(common.NetworkAndComputeShowOne):
return (display_columns, data)
class DeleteSecurityGroup(common.NetworkAndComputeCommand):
"""Delete a security group"""
class DeleteSecurityGroup(common.NetworkAndComputeDelete):
"""Delete security group(s)"""
# Used by base class to find resources in parsed_args.
resource = 'group'
r = None
def update_parser_common(self, parser):
parser.add_argument(
'group',
metavar='<group>',
help=_("Security group to delete (name or ID)")
nargs="+",
help=_("Security group(s) to delete (name or ID)"),
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_security_group(parsed_args.group)
obj = client.find_security_group(self.r, ignore_missing=False)
client.delete_security_group(obj)
def take_action_compute(self, client, parsed_args):
data = utils.find_resource(
client.security_groups,
parsed_args.group,
)
data = utils.find_resource(client.security_groups, self.r)
client.security_groups.delete(data.id)

View File

@ -333,23 +333,29 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
return _format_security_group_rule_show(obj._info)
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
class DeleteSecurityGroupRule(common.NetworkAndComputeDelete):
"""Delete security group rule(s)"""
# Used by base class to find resources in parsed_args.
resource = 'rule'
r = None
def update_parser_common(self, parser):
parser.add_argument(
'rule',
metavar='<rule>',
help=_("Security group rule to delete (ID only)")
nargs="+",
help=_("Security group rule(s) to delete (ID only)")
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_security_group_rule(parsed_args.rule)
obj = client.find_security_group_rule(
self.r, ignore_missing=False)
client.delete_security_group_rule(obj)
def take_action_compute(self, client, parsed_args):
client.security_group_rules.delete(parsed_args.rule)
client.security_group_rules.delete(self.r)
class ListSecurityGroupRule(common.NetworkAndComputeLister):

View File

@ -452,6 +452,25 @@ class FakeSecurityGroup(object):
return security_groups
@staticmethod
def get_security_groups(security_groups=None, count=2):
"""Get an iterable MagicMock object with a list of faked security groups.
If security groups list is provided, then initialize the Mock object
with the list. Otherwise create one.
:param List security groups:
A list of FakeResource objects faking security groups
:param int count:
The number of security groups to fake
:return:
An iterable Mock object with side_effect set to a list of faked
security groups
"""
if security_groups is None:
security_groups = FakeSecurityGroup.create_security_groups(count)
return mock.MagicMock(side_effect=security_groups)
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""

View File

@ -611,6 +611,25 @@ class FakeSecurityGroup(object):
return security_groups
@staticmethod
def get_security_groups(security_groups=None, count=2):
"""Get an iterable MagicMock object with a list of faked security groups.
If security groups list is provided, then initialize the Mock object
with the list. Otherwise create one.
:param List security groups:
A list of FakeResource objects faking security groups
:param int count:
The number of security groups to fake
:return:
An iterable Mock object with side_effect set to a list of faked
security groups
"""
if security_groups is None:
security_groups = FakeSecurityGroup.create_security_groups(count)
return mock.MagicMock(side_effect=security_groups)
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""
@ -670,6 +689,26 @@ class FakeSecurityGroupRule(object):
return security_group_rules
@staticmethod
def get_security_group_rules(security_group_rules=None, count=2):
"""Get an iterable MagicMock object with a list of faked security group rules.
If security group rules list is provided, then initialize the Mock
object with the list. Otherwise create one.
:param List security group rules:
A list of FakeResource objects faking security group rules
:param int count:
The number of security group rules to fake
:return:
An iterable Mock object with side_effect set to a list of faked
security group rules
"""
if security_group_rules is None:
security_group_rules = (
FakeSecurityGroupRule.create_security_group_rules(count))
return mock.MagicMock(side_effect=security_group_rules)
class FakeSubnet(object):
"""Fake one or more subnets."""

View File

@ -12,6 +12,9 @@
#
import mock
from mock import call
from osc_lib import exceptions
from openstackclient.network.v2 import floating_ip
from openstackclient.tests.compute.v2 import fakes as compute_fakes
@ -140,33 +143,84 @@ class TestCreateFloatingIPNetwork(TestFloatingIPNetwork):
class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
# The floating ip to be deleted.
floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip()
# The floating ips to be deleted.
floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2)
def setUp(self):
super(TestDeleteFloatingIPNetwork, self).setUp()
self.network.delete_ip = mock.Mock(return_value=None)
self.network.find_ip = mock.Mock(return_value=self.floating_ip)
self.network.find_ip = (
network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
# Get the command object to test
self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace)
def test_floating_ip_delete(self):
arglist = [
self.floating_ip.id,
self.floating_ips[0].id,
]
verifylist = [
('floating_ip', self.floating_ip.id),
('floating_ip', [self.floating_ips[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.find_ip.assert_called_once_with(self.floating_ip.id)
self.network.delete_ip.assert_called_once_with(self.floating_ip)
self.network.find_ip.assert_called_once_with(
self.floating_ips[0].id, ignore_missing=False)
self.network.delete_ip.assert_called_once_with(self.floating_ips[0])
self.assertIsNone(result)
def test_multi_floating_ips_delete(self):
arglist = []
verifylist = []
for f in self.floating_ips:
arglist.append(f.id)
verifylist = [
('floating_ip', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for f in self.floating_ips:
calls.append(call(f))
self.network.delete_ip.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_floating_ips_delete_with_exception(self):
arglist = [
self.floating_ips[0].id,
'unexist_floating_ip',
]
verifylist = [
('floating_ip',
[self.floating_ips[0].id, 'unexist_floating_ip']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [self.floating_ips[0], exceptions.CommandError]
self.network.find_ip = (
mock.MagicMock(side_effect=find_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 floating_ip failed to delete.', str(e))
self.network.find_ip.assert_any_call(
self.floating_ips[0].id, ignore_missing=False)
self.network.find_ip.assert_any_call(
'unexist_floating_ip', ignore_missing=False)
self.network.delete_ip.assert_called_once_with(
self.floating_ips[0]
)
class TestListFloatingIPNetwork(TestFloatingIPNetwork):
@ -335,8 +389,8 @@ class TestCreateFloatingIPCompute(TestFloatingIPCompute):
class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
# The floating ip to be deleted.
floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
# The floating ips to be deleted.
floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
def setUp(self):
super(TestDeleteFloatingIPCompute, self).setUp()
@ -346,27 +400,78 @@ class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
self.compute.floating_ips.delete.return_value = None
# Return value of utils.find_resource()
self.compute.floating_ips.get.return_value = self.floating_ip
self.compute.floating_ips.get = (
compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
# Get the command object to test
self.cmd = floating_ip.DeleteFloatingIP(self.app, None)
def test_floating_ip_delete(self):
arglist = [
self.floating_ip.id,
self.floating_ips[0].id,
]
verifylist = [
('floating_ip', self.floating_ip.id),
('floating_ip', [self.floating_ips[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.floating_ips.delete.assert_called_once_with(
self.floating_ip.id
self.floating_ips[0].id
)
self.assertIsNone(result)
def test_multi_floating_ips_delete(self):
arglist = []
verifylist = []
for f in self.floating_ips:
arglist.append(f.id)
verifylist = [
('floating_ip', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for f in self.floating_ips:
calls.append(call(f.id))
self.compute.floating_ips.delete.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_floating_ips_delete_with_exception(self):
arglist = [
self.floating_ips[0].id,
'unexist_floating_ip',
]
verifylist = [
('floating_ip',
[self.floating_ips[0].id, 'unexist_floating_ip']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [self.floating_ips[0], exceptions.CommandError]
self.compute.floating_ips.get = (
mock.MagicMock(side_effect=find_mock_result)
)
self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 floating_ip failed to delete.', str(e))
self.compute.floating_ips.get.assert_any_call(
self.floating_ips[0].id)
self.compute.floating_ips.get.assert_any_call(
'unexist_floating_ip')
self.compute.floating_ips.delete.assert_called_once_with(
self.floating_ips[0].id
)
class TestListFloatingIPCompute(TestFloatingIPCompute):

View File

@ -13,6 +13,9 @@
import copy
import mock
from mock import call
from osc_lib import exceptions
from openstackclient.network.v2 import security_group
from openstackclient.tests.compute.v2 import fakes as compute_fakes
@ -227,42 +230,93 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group to be deleted.
_security_group = \
network_fakes.FakeSecurityGroup.create_one_security_group()
# The security groups to be deleted.
_security_groups = \
network_fakes.FakeSecurityGroup.create_security_groups()
def setUp(self):
super(TestDeleteSecurityGroupNetwork, self).setUp()
self.network.delete_security_group = mock.Mock(return_value=None)
self.network.find_security_group = mock.Mock(
return_value=self._security_group)
self.network.find_security_group = (
network_fakes.FakeSecurityGroup.get_security_groups(
self._security_groups)
)
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace)
def test_security_group_delete(self):
arglist = [
self._security_group.name,
self._security_groups[0].name,
]
verifylist = [
('group', self._security_group.name),
('group', [self._security_groups[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_security_group.assert_called_once_with(
self._security_group)
self._security_groups[0])
self.assertIsNone(result)
def test_multi_security_groups_delete(self):
arglist = []
verifylist = []
for s in self._security_groups:
arglist.append(s.name)
verifylist = [
('group', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_groups:
calls.append(call(s))
self.network.delete_security_group.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_security_groups_delete_with_exception(self):
arglist = [
self._security_groups[0].name,
'unexist_security_group',
]
verifylist = [
('group',
[self._security_groups[0].name, 'unexist_security_group']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [self._security_groups[0], exceptions.CommandError]
self.network.find_security_group = (
mock.MagicMock(side_effect=find_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 group failed to delete.', str(e))
self.network.find_security_group.assert_any_call(
self._security_groups[0].name, ignore_missing=False)
self.network.find_security_group.assert_any_call(
'unexist_security_group', ignore_missing=False)
self.network.delete_security_group.assert_called_once_with(
self._security_groups[0]
)
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
# The security group to be deleted.
_security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
# The security groups to be deleted.
_security_groups = \
compute_fakes.FakeSecurityGroup.create_security_groups()
def setUp(self):
super(TestDeleteSecurityGroupCompute, self).setUp()
@ -271,27 +325,80 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
self.compute.security_groups.delete = mock.Mock(return_value=None)
self.compute.security_groups.get = mock.Mock(
return_value=self._security_group)
self.compute.security_groups.get = (
compute_fakes.FakeSecurityGroup.get_security_groups(
self._security_groups)
)
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
def test_security_group_delete(self):
arglist = [
self._security_group.name,
self._security_groups[0].id,
]
verifylist = [
('group', self._security_group.name),
('group', [self._security_groups[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.security_groups.delete.assert_called_once_with(
self._security_group.id)
self._security_groups[0].id)
self.assertIsNone(result)
def test_multi_security_groups_delete(self):
arglist = []
verifylist = []
for s in self._security_groups:
arglist.append(s.id)
verifylist = [
('group', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_groups:
calls.append(call(s.id))
self.compute.security_groups.delete.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_security_groups_delete_with_exception(self):
arglist = [
self._security_groups[0].id,
'unexist_security_group',
]
verifylist = [
('group',
[self._security_groups[0].id, 'unexist_security_group']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [self._security_groups[0], exceptions.CommandError]
self.compute.security_groups.get = (
mock.MagicMock(side_effect=find_mock_result)
)
self.compute.security_groups.find.side_effect = (
exceptions.NotFound(None))
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 group failed to delete.', str(e))
self.compute.security_groups.get.assert_any_call(
self._security_groups[0].id)
self.compute.security_groups.get.assert_any_call(
'unexist_security_group')
self.compute.security_groups.delete.assert_called_once_with(
self._security_groups[0].id
)
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):

View File

@ -13,6 +13,7 @@
import copy
import mock
from mock import call
from osc_lib import exceptions
@ -668,17 +669,20 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be deleted.
_security_group_rule = \
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
# The security group rules to be deleted.
_security_group_rules = \
network_fakes.FakeSecurityGroupRule.create_security_group_rules(
count=2)
def setUp(self):
super(TestDeleteSecurityGroupRuleNetwork, self).setUp()
self.network.delete_security_group_rule = mock.Mock(return_value=None)
self.network.find_security_group_rule = mock.Mock(
return_value=self._security_group_rule)
self.network.find_security_group_rule = (
network_fakes.FakeSecurityGroupRule.get_security_group_rules(
self._security_group_rules)
)
# Get the command object to test
self.cmd = security_group_rule.DeleteSecurityGroupRule(
@ -686,25 +690,76 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
def test_security_group_rule_delete(self):
arglist = [
self._security_group_rule.id,
self._security_group_rules[0].id,
]
verifylist = [
('rule', self._security_group_rule.id),
('rule', [self._security_group_rules[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_security_group_rule.assert_called_once_with(
self._security_group_rule)
self._security_group_rules[0])
self.assertIsNone(result)
def test_multi_security_group_rules_delete(self):
arglist = []
verifylist = []
for s in self._security_group_rules:
arglist.append(s.id)
verifylist = [
('rule', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_group_rules:
calls.append(call(s))
self.network.delete_security_group_rule.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_security_group_rules_delete_with_exception(self):
arglist = [
self._security_group_rules[0].id,
'unexist_rule',
]
verifylist = [
('rule',
[self._security_group_rules[0].id, 'unexist_rule']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [
self._security_group_rules[0], exceptions.CommandError]
self.network.find_security_group_rule = (
mock.MagicMock(side_effect=find_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 rule failed to delete.', str(e))
self.network.find_security_group_rule.assert_any_call(
self._security_group_rules[0].id, ignore_missing=False)
self.network.find_security_group_rule.assert_any_call(
'unexist_rule', ignore_missing=False)
self.network.delete_security_group_rule.assert_called_once_with(
self._security_group_rules[0]
)
class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be deleted.
_security_group_rule = \
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
_security_group_rules = \
compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
count=2)
def setUp(self):
super(TestDeleteSecurityGroupRuleCompute, self).setUp()
@ -716,19 +771,65 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
def test_security_group_rule_delete(self):
arglist = [
self._security_group_rule.id,
self._security_group_rules[0].id,
]
verifylist = [
('rule', self._security_group_rule.id),
('rule', [self._security_group_rules[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.security_group_rules.delete.assert_called_once_with(
self._security_group_rule.id)
self._security_group_rules[0].id)
self.assertIsNone(result)
def test_multi_security_group_rules_delete(self):
arglist = []
verifylist = []
for s in self._security_group_rules:
arglist.append(s.id)
verifylist = [
('rule', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for s in self._security_group_rules:
calls.append(call(s.id))
self.compute.security_group_rules.delete.assert_has_calls(calls)
self.assertIsNone(result)
def test_multi_security_group_rules_delete_with_exception(self):
arglist = [
self._security_group_rules[0].id,
'unexist_rule',
]
verifylist = [
('rule',
[self._security_group_rules[0].id, 'unexist_rule']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
find_mock_result = [None, exceptions.CommandError]
self.compute.security_group_rules.delete = (
mock.MagicMock(side_effect=find_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 rule failed to delete.', str(e))
self.compute.security_group_rules.delete.assert_any_call(
self._security_group_rules[0].id)
self.compute.security_group_rules.delete.assert_any_call(
'unexist_rule')
class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):

View File

@ -0,0 +1,5 @@
---
features:
- Support bulk deletion for ``floating ip delete``, ``security group delete``,
and ``security group rule delete`` commands in networkv2.
[Blueprint `multi-argument-network <https://blueprints.launchpad.net/python-openstackclient/+spec/multi-argument-network>`_]