Merge "Refactor security group rule create to use SDK"
This commit is contained in:
commit
d5596862b1
@ -32,7 +32,8 @@ Create a new security group rule
|
||||
|
||||
.. option:: --dst-port <port-range>
|
||||
|
||||
Destination port, may be a range: 137:139 (default: 0; only required for proto tcp and udp)
|
||||
Destination port, may be a single port or port range: 137:139
|
||||
(only required for IP protocols tcp and udp)
|
||||
|
||||
.. describe:: <group>
|
||||
|
||||
|
@ -16,15 +16,12 @@
|
||||
|
||||
"""Compute v2 Security Group action implementations"""
|
||||
|
||||
import six
|
||||
|
||||
try:
|
||||
from novaclient.v2 import security_group_rules
|
||||
except ImportError:
|
||||
from novaclient.v1_1 import security_group_rules
|
||||
|
||||
from openstackclient.common import command
|
||||
from openstackclient.common import parseractions
|
||||
from openstackclient.common import utils
|
||||
|
||||
|
||||
@ -56,68 +53,6 @@ def _xform_security_group_rule(sgroup):
|
||||
return info
|
||||
|
||||
|
||||
class CreateSecurityGroupRule(command.ShowOne):
|
||||
"""Create a new security group rule"""
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateSecurityGroupRule, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'group',
|
||||
metavar='<group>',
|
||||
help='Create rule in this security group (name or ID)',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proto",
|
||||
metavar="<proto>",
|
||||
default="tcp",
|
||||
help="IP protocol (icmp, tcp, udp; default: tcp)",
|
||||
)
|
||||
source_group = parser.add_mutually_exclusive_group()
|
||||
source_group.add_argument(
|
||||
"--src-ip",
|
||||
metavar="<ip-address>",
|
||||
default="0.0.0.0/0",
|
||||
help="Source IP address block (may use CIDR notation; default: "
|
||||
"0.0.0.0/0)",
|
||||
)
|
||||
source_group.add_argument(
|
||||
"--src-group",
|
||||
metavar="<group>",
|
||||
help="Source security group (ID only)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dst-port",
|
||||
metavar="<port-range>",
|
||||
default=(0, 0),
|
||||
action=parseractions.RangeAction,
|
||||
help="Destination port, may be a range: 137:139 (default: 0; "
|
||||
"only required for proto tcp and udp)",
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
compute_client = self.app.client_manager.compute
|
||||
group = utils.find_resource(
|
||||
compute_client.security_groups,
|
||||
parsed_args.group,
|
||||
)
|
||||
if parsed_args.proto.lower() == 'icmp':
|
||||
from_port, to_port = -1, -1
|
||||
else:
|
||||
from_port, to_port = parsed_args.dst_port
|
||||
data = compute_client.security_group_rules.create(
|
||||
group.id,
|
||||
parsed_args.proto,
|
||||
from_port,
|
||||
to_port,
|
||||
parsed_args.src_ip,
|
||||
parsed_args.src_group,
|
||||
)
|
||||
|
||||
info = _xform_security_group_rule(data._info)
|
||||
return zip(*sorted(six.iteritems(info)))
|
||||
|
||||
|
||||
class ListSecurityGroupRule(command.Lister):
|
||||
"""List security group rules"""
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
import six
|
||||
|
||||
from openstackclient.common import exceptions
|
||||
from openstackclient.common import parseractions
|
||||
from openstackclient.common import utils
|
||||
from openstackclient.network import common
|
||||
from openstackclient.network import utils as network_utils
|
||||
@ -34,6 +35,105 @@ def _get_columns(item):
|
||||
return tuple(sorted(columns))
|
||||
|
||||
|
||||
def _convert_to_lowercase(string):
|
||||
return string.lower()
|
||||
|
||||
|
||||
class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
|
||||
"""Create a new security group rule"""
|
||||
|
||||
def update_parser_common(self, parser):
|
||||
parser.add_argument(
|
||||
'group',
|
||||
metavar='<group>',
|
||||
help='Create rule in this security group (name or ID)',
|
||||
)
|
||||
# TODO(rtheis): Add support for additional protocols for network.
|
||||
# Until then, continue enforcing the compute choices.
|
||||
parser.add_argument(
|
||||
"--proto",
|
||||
metavar="<proto>",
|
||||
default="tcp",
|
||||
choices=['icmp', 'tcp', 'udp'],
|
||||
type=_convert_to_lowercase,
|
||||
help="IP protocol (icmp, tcp, udp; default: tcp)",
|
||||
)
|
||||
source_group = parser.add_mutually_exclusive_group()
|
||||
source_group.add_argument(
|
||||
"--src-ip",
|
||||
metavar="<ip-address>",
|
||||
default="0.0.0.0/0",
|
||||
help="Source IP address block (may use CIDR notation; default: "
|
||||
"0.0.0.0/0)",
|
||||
)
|
||||
source_group.add_argument(
|
||||
"--src-group",
|
||||
metavar="<group>",
|
||||
help="Source security group (ID only)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dst-port",
|
||||
metavar="<port-range>",
|
||||
default=(0, 0),
|
||||
action=parseractions.RangeAction,
|
||||
help="Destination port, may be a single port or port range: "
|
||||
"137:139 (only required for IP protocols tcp and udp)",
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action_network(self, client, parsed_args):
|
||||
# Get the security group ID to hold the rule.
|
||||
security_group_id = client.find_security_group(
|
||||
parsed_args.group,
|
||||
ignore_missing=False
|
||||
).id
|
||||
|
||||
# Build the create attributes.
|
||||
attrs = {}
|
||||
# TODO(rtheis): Add --direction option. Until then, continue
|
||||
# with the default of 'ingress'.
|
||||
attrs['direction'] = 'ingress'
|
||||
# TODO(rtheis): Add --ethertype option. Until then, continue
|
||||
# with the default of 'IPv4'
|
||||
attrs['ethertype'] = 'IPv4'
|
||||
# TODO(rtheis): Add port range support (type and code) for icmp
|
||||
# protocol. Until then, continue ignoring the port range.
|
||||
if parsed_args.proto != 'icmp':
|
||||
attrs['port_range_min'] = parsed_args.dst_port[0]
|
||||
attrs['port_range_max'] = parsed_args.dst_port[1]
|
||||
attrs['protocol'] = parsed_args.proto
|
||||
if parsed_args.src_group is not None:
|
||||
attrs['remote_group_id'] = parsed_args.src_group
|
||||
else:
|
||||
attrs['remote_ip_prefix'] = parsed_args.src_ip
|
||||
attrs['security_group_id'] = security_group_id
|
||||
|
||||
# Create and show the security group rule.
|
||||
obj = client.create_security_group_rule(**attrs)
|
||||
columns = _get_columns(obj)
|
||||
data = utils.get_item_properties(obj, columns)
|
||||
return (columns, data)
|
||||
|
||||
def take_action_compute(self, client, parsed_args):
|
||||
group = utils.find_resource(
|
||||
client.security_groups,
|
||||
parsed_args.group,
|
||||
)
|
||||
if parsed_args.proto == 'icmp':
|
||||
from_port, to_port = -1, -1
|
||||
else:
|
||||
from_port, to_port = parsed_args.dst_port
|
||||
obj = client.security_group_rules.create(
|
||||
group.id,
|
||||
parsed_args.proto,
|
||||
from_port,
|
||||
to_port,
|
||||
parsed_args.src_ip,
|
||||
parsed_args.src_group,
|
||||
)
|
||||
return _format_security_group_rule_show(obj._info)
|
||||
|
||||
|
||||
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
|
||||
"""Delete a security group rule"""
|
||||
|
||||
|
@ -393,13 +393,13 @@ class FakeSecurityGroupRule(object):
|
||||
|
||||
# Set default attributes.
|
||||
security_group_rule_attrs = {
|
||||
'from_port': -1,
|
||||
'from_port': 0,
|
||||
'group': {},
|
||||
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
|
||||
'ip_protocol': 'icmp',
|
||||
'ip_protocol': 'tcp',
|
||||
'ip_range': {'cidr': '0.0.0.0/0'},
|
||||
'parent_group_id': 'security-group-id-' + uuid.uuid4().hex,
|
||||
'to_port': -1,
|
||||
'to_port': 0,
|
||||
}
|
||||
|
||||
# Overwrite default attributes.
|
||||
|
@ -17,7 +17,6 @@ from openstackclient.compute.v2 import security_group
|
||||
from openstackclient.tests.compute.v2 import fakes as compute_fakes
|
||||
from openstackclient.tests import fakes
|
||||
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
|
||||
from openstackclient.tests import utils
|
||||
|
||||
|
||||
security_group_id = '11'
|
||||
@ -111,271 +110,6 @@ class TestSecurityGroupRule(compute_fakes.TestComputev2):
|
||||
self.sg_rules_mock.reset_mock()
|
||||
|
||||
|
||||
class TestSecurityGroupRuleCreate(TestSecurityGroupRule):
|
||||
|
||||
columns = (
|
||||
'id',
|
||||
'ip_protocol',
|
||||
'ip_range',
|
||||
'parent_group_id',
|
||||
'port_range',
|
||||
'remote_security_group',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestSecurityGroupRuleCreate, self).setUp()
|
||||
|
||||
self.secgroups_mock.get.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
copy.deepcopy(SECURITY_GROUP),
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = security_group.CreateSecurityGroupRule(self.app, None)
|
||||
|
||||
def test_security_group_rule_create_no_options(self):
|
||||
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
copy.deepcopy(SECURITY_GROUP_RULE),
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
arglist = [
|
||||
security_group_name,
|
||||
]
|
||||
verifylist = [
|
||||
('group', security_group_name),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
# In base command class ShowOne in cliff, abstract method take_action()
|
||||
# returns a two-part tuple with a tuple of column names and a tuple of
|
||||
# data to be shown.
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
# SecurityGroupManager.create(name, description)
|
||||
self.sg_rules_mock.create.assert_called_with(
|
||||
security_group_id,
|
||||
'tcp',
|
||||
0,
|
||||
0,
|
||||
security_group_rule_cidr,
|
||||
None,
|
||||
)
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
datalist = (
|
||||
security_group_rule_id,
|
||||
'tcp',
|
||||
security_group_rule_cidr,
|
||||
security_group_id,
|
||||
'0:0',
|
||||
'',
|
||||
)
|
||||
self.assertEqual(datalist, data)
|
||||
|
||||
def test_security_group_rule_create_ftp(self):
|
||||
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
|
||||
sg_rule['from_port'] = 20
|
||||
sg_rule['to_port'] = 21
|
||||
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
sg_rule,
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
arglist = [
|
||||
security_group_name,
|
||||
'--dst-port', '20:21',
|
||||
]
|
||||
verifylist = [
|
||||
('group', security_group_name),
|
||||
('dst_port', (20, 21)),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
# In base command class ShowOne in cliff, abstract method take_action()
|
||||
# returns a two-part tuple with a tuple of column names and a tuple of
|
||||
# data to be shown.
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
# SecurityGroupManager.create(name, description)
|
||||
self.sg_rules_mock.create.assert_called_with(
|
||||
security_group_id,
|
||||
'tcp',
|
||||
20,
|
||||
21,
|
||||
security_group_rule_cidr,
|
||||
None,
|
||||
)
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
datalist = (
|
||||
security_group_rule_id,
|
||||
'tcp',
|
||||
security_group_rule_cidr,
|
||||
security_group_id,
|
||||
'20:21',
|
||||
'',
|
||||
)
|
||||
self.assertEqual(datalist, data)
|
||||
|
||||
def test_security_group_rule_create_ssh(self):
|
||||
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
|
||||
sg_rule['from_port'] = 22
|
||||
sg_rule['to_port'] = 22
|
||||
sg_rule['ip_range'] = {}
|
||||
sg_rule['group'] = {'name': security_group_name}
|
||||
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
sg_rule,
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
arglist = [
|
||||
security_group_name,
|
||||
'--dst-port', '22',
|
||||
'--src-group', security_group_id,
|
||||
]
|
||||
verifylist = [
|
||||
('group', security_group_name),
|
||||
('dst_port', (22, 22)),
|
||||
('src_group', security_group_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
# In base command class ShowOne in cliff, abstract method take_action()
|
||||
# returns a two-part tuple with a tuple of column names and a tuple of
|
||||
# data to be shown.
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
# SecurityGroupManager.create(name, description)
|
||||
self.sg_rules_mock.create.assert_called_with(
|
||||
security_group_id,
|
||||
'tcp',
|
||||
22,
|
||||
22,
|
||||
security_group_rule_cidr,
|
||||
security_group_id,
|
||||
)
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
datalist = (
|
||||
security_group_rule_id,
|
||||
'tcp',
|
||||
'',
|
||||
security_group_id,
|
||||
'22:22',
|
||||
security_group_name,
|
||||
)
|
||||
self.assertEqual(datalist, data)
|
||||
|
||||
def test_security_group_rule_create_udp(self):
|
||||
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
|
||||
sg_rule['ip_protocol'] = 'udp'
|
||||
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
sg_rule,
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
arglist = [
|
||||
security_group_name,
|
||||
'--proto', 'udp',
|
||||
]
|
||||
verifylist = [
|
||||
('group', security_group_name),
|
||||
('proto', 'udp'),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
# In base command class ShowOne in cliff, abstract method take_action()
|
||||
# returns a two-part tuple with a tuple of column names and a tuple of
|
||||
# data to be shown.
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
# SecurityGroupManager.create(name, description)
|
||||
self.sg_rules_mock.create.assert_called_with(
|
||||
security_group_id,
|
||||
'udp',
|
||||
0,
|
||||
0,
|
||||
security_group_rule_cidr,
|
||||
None,
|
||||
)
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
datalist = (
|
||||
security_group_rule_id,
|
||||
'udp',
|
||||
security_group_rule_cidr,
|
||||
security_group_id,
|
||||
'0:0',
|
||||
'',
|
||||
)
|
||||
self.assertEqual(datalist, data)
|
||||
|
||||
def test_security_group_rule_create_icmp(self):
|
||||
sg_rule_cidr = '10.0.2.0/24'
|
||||
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE_ICMP)
|
||||
sg_rule['ip_range'] = {'cidr': sg_rule_cidr}
|
||||
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
|
||||
None,
|
||||
sg_rule,
|
||||
loaded=True,
|
||||
)
|
||||
|
||||
arglist = [
|
||||
security_group_name,
|
||||
'--proto', 'ICMP',
|
||||
'--src-ip', sg_rule_cidr,
|
||||
]
|
||||
verifylist = [
|
||||
('group', security_group_name),
|
||||
('proto', 'ICMP'),
|
||||
('src_ip', sg_rule_cidr)
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
# In base command class ShowOne in cliff, abstract method take_action()
|
||||
# returns a two-part tuple with a tuple of column names and a tuple of
|
||||
# data to be shown.
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
# SecurityGroupManager.create(name, description)
|
||||
self.sg_rules_mock.create.assert_called_with(
|
||||
security_group_id,
|
||||
'ICMP',
|
||||
-1,
|
||||
-1,
|
||||
sg_rule_cidr,
|
||||
None,
|
||||
)
|
||||
|
||||
self.assertEqual(self.columns, columns)
|
||||
datalist = (
|
||||
security_group_rule_id,
|
||||
'icmp',
|
||||
sg_rule_cidr,
|
||||
security_group_id,
|
||||
'',
|
||||
'',
|
||||
)
|
||||
self.assertEqual(datalist, data)
|
||||
|
||||
def test_security_group_rule_create_src_invalid(self):
|
||||
arglist = [
|
||||
security_group_name,
|
||||
'--proto', 'ICMP',
|
||||
'--src-ip', security_group_rule_cidr,
|
||||
'--src-group', security_group_id,
|
||||
]
|
||||
|
||||
self.assertRaises(utils.ParserException,
|
||||
self.check_parser, self.cmd, arglist, [])
|
||||
|
||||
|
||||
class TestSecurityGroupRuleList(TestSecurityGroupRule):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -510,11 +510,11 @@ class FakeSecurityGroupRule(object):
|
||||
'direction': 'ingress',
|
||||
'ethertype': 'IPv4',
|
||||
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
|
||||
'port_range_max': None,
|
||||
'port_range_min': None,
|
||||
'protocol': None,
|
||||
'remote_group_id': 'remote-security-group-id-' + uuid.uuid4().hex,
|
||||
'remote_ip_prefix': None,
|
||||
'port_range_max': 0,
|
||||
'port_range_min': 0,
|
||||
'protocol': 'tcp',
|
||||
'remote_group_id': None,
|
||||
'remote_ip_prefix': '0.0.0.0/0',
|
||||
'security_group_id': 'security-group-id-' + uuid.uuid4().hex,
|
||||
'tenant_id': 'project-id-' + uuid.uuid4().hex,
|
||||
}
|
||||
|
@ -39,6 +39,317 @@ class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
|
||||
self.compute = self.app.client_manager.compute
|
||||
|
||||
|
||||
class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||
|
||||
# The security group rule to be created.
|
||||
_security_group_rule = None
|
||||
|
||||
# The security group that will contain the rule created.
|
||||
_security_group = \
|
||||
network_fakes.FakeSecurityGroup.create_one_security_group()
|
||||
|
||||
expected_columns = (
|
||||
'direction',
|
||||
'ethertype',
|
||||
'id',
|
||||
'port_range_max',
|
||||
'port_range_min',
|
||||
'project_id',
|
||||
'protocol',
|
||||
'remote_group_id',
|
||||
'remote_ip_prefix',
|
||||
'security_group_id',
|
||||
)
|
||||
|
||||
expected_data = None
|
||||
|
||||
def _setup_security_group_rule(self, attrs=None):
|
||||
self._security_group_rule = \
|
||||
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
|
||||
attrs)
|
||||
self.network.create_security_group_rule = mock.Mock(
|
||||
return_value=self._security_group_rule)
|
||||
self.expected_data = (
|
||||
self._security_group_rule.direction,
|
||||
self._security_group_rule.ethertype,
|
||||
self._security_group_rule.id,
|
||||
self._security_group_rule.port_range_max,
|
||||
self._security_group_rule.port_range_min,
|
||||
self._security_group_rule.project_id,
|
||||
self._security_group_rule.protocol,
|
||||
self._security_group_rule.remote_group_id,
|
||||
self._security_group_rule.remote_ip_prefix,
|
||||
self._security_group_rule.security_group_id,
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSecurityGroupRuleNetwork, self).setUp()
|
||||
|
||||
self.network.find_security_group = mock.Mock(
|
||||
return_value=self._security_group)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = security_group_rule.CreateSecurityGroupRule(
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_create_no_options(self):
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, [], [])
|
||||
|
||||
def test_create_source_group_and_ip(self):
|
||||
arglist = [
|
||||
'--src-ip', '10.10.0.0/24',
|
||||
'--src-group', self._security_group.id,
|
||||
self._security_group.id,
|
||||
]
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, arglist, [])
|
||||
|
||||
def test_create_bad_protocol(self):
|
||||
arglist = [
|
||||
'--protocol', 'foo',
|
||||
self._security_group.id,
|
||||
]
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, arglist, [])
|
||||
|
||||
def test_create_default_rule(self):
|
||||
self._setup_security_group_rule({
|
||||
'port_range_max': 443,
|
||||
'port_range_min': 443,
|
||||
})
|
||||
arglist = [
|
||||
'--dst-port', str(self._security_group_rule.port_range_min),
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.create_security_group_rule.assert_called_once_with(**{
|
||||
'direction': self._security_group_rule.direction,
|
||||
'ethertype': self._security_group_rule.ethertype,
|
||||
'port_range_max': self._security_group_rule.port_range_max,
|
||||
'port_range_min': self._security_group_rule.port_range_min,
|
||||
'protocol': self._security_group_rule.protocol,
|
||||
'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
|
||||
'security_group_id': self._security_group.id,
|
||||
})
|
||||
self.assertEqual(tuple(self.expected_columns), columns)
|
||||
self.assertEqual(self.expected_data, data)
|
||||
|
||||
def test_create_source_group(self):
|
||||
self._setup_security_group_rule({
|
||||
'port_range_max': 22,
|
||||
'port_range_min': 22,
|
||||
'remote_group_id': self._security_group.id,
|
||||
})
|
||||
arglist = [
|
||||
'--dst-port', str(self._security_group_rule.port_range_min),
|
||||
'--src-group', self._security_group.id,
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('dst_port', (self._security_group_rule.port_range_min,
|
||||
self._security_group_rule.port_range_max)),
|
||||
('src_group', self._security_group.id),
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.create_security_group_rule.assert_called_once_with(**{
|
||||
'direction': self._security_group_rule.direction,
|
||||
'ethertype': self._security_group_rule.ethertype,
|
||||
'port_range_max': self._security_group_rule.port_range_max,
|
||||
'port_range_min': self._security_group_rule.port_range_min,
|
||||
'protocol': self._security_group_rule.protocol,
|
||||
'remote_group_id': self._security_group_rule.remote_group_id,
|
||||
'security_group_id': self._security_group.id,
|
||||
})
|
||||
self.assertEqual(tuple(self.expected_columns), columns)
|
||||
self.assertEqual(self.expected_data, data)
|
||||
|
||||
def test_create_source_ip(self):
|
||||
self._setup_security_group_rule({
|
||||
'protocol': 'icmp',
|
||||
'port_range_max': -1,
|
||||
'port_range_min': -1,
|
||||
'remote_ip_prefix': '10.0.2.0/24',
|
||||
})
|
||||
arglist = [
|
||||
'--proto', self._security_group_rule.protocol,
|
||||
'--src-ip', self._security_group_rule.remote_ip_prefix,
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('proto', self._security_group_rule.protocol),
|
||||
('src_ip', self._security_group_rule.remote_ip_prefix),
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.create_security_group_rule.assert_called_once_with(**{
|
||||
'direction': self._security_group_rule.direction,
|
||||
'ethertype': self._security_group_rule.ethertype,
|
||||
'protocol': self._security_group_rule.protocol,
|
||||
'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
|
||||
'security_group_id': self._security_group.id,
|
||||
})
|
||||
self.assertEqual(tuple(self.expected_columns), columns)
|
||||
self.assertEqual(self.expected_data, data)
|
||||
|
||||
|
||||
class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||
|
||||
# The security group rule to be created.
|
||||
_security_group_rule = None
|
||||
|
||||
# The security group that will contain the rule created.
|
||||
_security_group = \
|
||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||
|
||||
def _setup_security_group_rule(self, attrs=None):
|
||||
self._security_group_rule = \
|
||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
|
||||
attrs)
|
||||
self.compute.security_group_rules.create.return_value = \
|
||||
self._security_group_rule
|
||||
expected_columns, expected_data = \
|
||||
security_group_rule._format_security_group_rule_show(
|
||||
self._security_group_rule._info)
|
||||
return expected_columns, expected_data
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSecurityGroupRuleCompute, self).setUp()
|
||||
|
||||
self.app.client_manager.network_endpoint_enabled = False
|
||||
|
||||
self.compute.security_groups.get.return_value = self._security_group
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
|
||||
|
||||
def test_create_no_options(self):
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, [], [])
|
||||
|
||||
def test_create_source_group_and_ip(self):
|
||||
arglist = [
|
||||
'--src-ip', '10.10.0.0/24',
|
||||
'--src-group', self._security_group.id,
|
||||
self._security_group.id,
|
||||
]
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, arglist, [])
|
||||
|
||||
def test_create_bad_protocol(self):
|
||||
arglist = [
|
||||
'--protocol', 'foo',
|
||||
self._security_group.id,
|
||||
]
|
||||
self.assertRaises(tests_utils.ParserException,
|
||||
self.check_parser, self.cmd, arglist, [])
|
||||
|
||||
def test_create_default_rule(self):
|
||||
expected_columns, expected_data = self._setup_security_group_rule()
|
||||
dst_port = str(self._security_group_rule.from_port) + ':' + \
|
||||
str(self._security_group_rule.to_port)
|
||||
arglist = [
|
||||
'--dst-port', dst_port,
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('dst_port', (self._security_group_rule.from_port,
|
||||
self._security_group_rule.to_port)),
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.compute.security_group_rules.create.assert_called_once_with(
|
||||
self._security_group.id,
|
||||
self._security_group_rule.ip_protocol,
|
||||
self._security_group_rule.from_port,
|
||||
self._security_group_rule.to_port,
|
||||
self._security_group_rule.ip_range['cidr'],
|
||||
None,
|
||||
)
|
||||
self.assertEqual(expected_columns, columns)
|
||||
self.assertEqual(expected_data, data)
|
||||
|
||||
def test_create_source_group(self):
|
||||
expected_columns, expected_data = self._setup_security_group_rule({
|
||||
'from_port': 22,
|
||||
'to_port': 22,
|
||||
'group': {'name': self._security_group.id},
|
||||
})
|
||||
arglist = [
|
||||
'--dst-port', str(self._security_group_rule.from_port),
|
||||
'--src-group', self._security_group.id,
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('dst_port', (self._security_group_rule.from_port,
|
||||
self._security_group_rule.to_port)),
|
||||
('src_group', self._security_group.id),
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.compute.security_group_rules.create.assert_called_once_with(
|
||||
self._security_group.id,
|
||||
self._security_group_rule.ip_protocol,
|
||||
self._security_group_rule.from_port,
|
||||
self._security_group_rule.to_port,
|
||||
self._security_group_rule.ip_range['cidr'],
|
||||
self._security_group.id,
|
||||
)
|
||||
self.assertEqual(expected_columns, columns)
|
||||
self.assertEqual(expected_data, data)
|
||||
|
||||
def test_create_source_ip(self):
|
||||
expected_columns, expected_data = self._setup_security_group_rule({
|
||||
'ip_protocol': 'icmp',
|
||||
'from_port': -1,
|
||||
'to_port': -1,
|
||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||
})
|
||||
arglist = [
|
||||
'--proto', self._security_group_rule.ip_protocol,
|
||||
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
||||
self._security_group.id,
|
||||
]
|
||||
verifylist = [
|
||||
('proto', self._security_group_rule.ip_protocol),
|
||||
('src_ip', self._security_group_rule.ip_range['cidr']),
|
||||
('group', self._security_group.id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.compute.security_group_rules.create.assert_called_once_with(
|
||||
self._security_group.id,
|
||||
self._security_group_rule.ip_protocol,
|
||||
self._security_group_rule.from_port,
|
||||
self._security_group_rule.to_port,
|
||||
self._security_group_rule.ip_range['cidr'],
|
||||
None,
|
||||
)
|
||||
self.assertEqual(expected_columns, columns)
|
||||
self.assertEqual(expected_data, data)
|
||||
|
||||
|
||||
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||
|
||||
# The security group rule to be deleted.
|
||||
@ -68,7 +379,7 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.delete_security_group_rule.assert_called_with(
|
||||
self.network.delete_security_group_rule.assert_called_once_with(
|
||||
self._security_group_rule)
|
||||
self.assertIsNone(result)
|
||||
|
||||
@ -98,7 +409,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.compute.security_group_rules.delete.assert_called_with(
|
||||
self.compute.security_group_rules.delete.assert_called_once_with(
|
||||
self._security_group_rule.id)
|
||||
self.assertIsNone(result)
|
||||
|
||||
@ -160,7 +471,7 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.network.find_security_group_rule.assert_called_with(
|
||||
self.network.find_security_group_rule.assert_called_once_with(
|
||||
self._security_group_rule.id, ignore_missing=False)
|
||||
self.assertEqual(tuple(self.columns), columns)
|
||||
self.assertEqual(self.data, data)
|
||||
@ -207,6 +518,6 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.compute.security_groups.list.assert_called_with()
|
||||
self.compute.security_groups.list.assert_called_once_with()
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
7
releasenotes/notes/bug-1519512-4231ac6014109142.yaml
Normal file
7
releasenotes/notes/bug-1519512-4231ac6014109142.yaml
Normal file
@ -0,0 +1,7 @@
|
||||
---
|
||||
upgrade:
|
||||
- The ``security group rule create`` command now uses Network v2
|
||||
when enabled which results in a more detailed output for network
|
||||
security group rules that matches the ``security group rule show``
|
||||
command.
|
||||
[Bug `1519512 <https://bugs.launchpad.net/bugs/1519512>`_]
|
@ -99,7 +99,6 @@ openstack.compute.v2 =
|
||||
keypair_list = openstackclient.compute.v2.keypair:ListKeypair
|
||||
keypair_show = openstackclient.compute.v2.keypair:ShowKeypair
|
||||
|
||||
security_group_rule_create = openstackclient.compute.v2.security_group:CreateSecurityGroupRule
|
||||
security_group_rule_list = openstackclient.compute.v2.security_group:ListSecurityGroupRule
|
||||
|
||||
server_add_security_group = openstackclient.compute.v2.server:AddServerSecurityGroup
|
||||
@ -349,6 +348,8 @@ openstack.network.v2 =
|
||||
security_group_list = openstackclient.network.v2.security_group:ListSecurityGroup
|
||||
security_group_set = openstackclient.network.v2.security_group:SetSecurityGroup
|
||||
security_group_show = openstackclient.network.v2.security_group:ShowSecurityGroup
|
||||
|
||||
security_group_rule_create = openstackclient.network.v2.security_group_rule:CreateSecurityGroupRule
|
||||
security_group_rule_delete = openstackclient.network.v2.security_group_rule:DeleteSecurityGroupRule
|
||||
security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user