Merge "Add "security group rule show" command"

This commit is contained in:
Jenkins 2016-02-23 18:02:55 +00:00 committed by Gerrit Code Review
commit c2f5945ef6
7 changed files with 236 additions and 4 deletions

View File

@ -67,3 +67,18 @@ List security group rules
.. describe:: <group>
List all rules in this security group (name or ID)
security group rule show
------------------------
Display security group rule details
.. program:: security group rule show
.. code:: bash
os security group rule show
<rule>
.. describe:: <rule>
Security group rule to display (ID only)

View File

@ -57,3 +57,10 @@ class SecurityGroupRuleTests(test.TestCase):
self.SECURITY_GROUP_NAME +
opts)
self.assertIn(self.SECURITY_GROUP_RULE_ID, raw_output)
def test_security_group_rule_show(self):
opts = self.get_show_opts(self.ID_FIELD)
raw_output = self.openstack('security group rule show ' +
self.SECURITY_GROUP_RULE_ID +
opts)
self.assertEqual(self.SECURITY_GROUP_RULE_ID + "\n", raw_output)

View File

@ -13,9 +13,54 @@
"""Security Group Rule action implementations"""
import six
from openstackclient.common import exceptions
from openstackclient.common import utils
from openstackclient.network import common
def _xform_security_group_rule(sgroup):
info = {}
info.update(sgroup)
from_port = info.pop('from_port')
to_port = info.pop('to_port')
if isinstance(from_port, int) and isinstance(to_port, int):
port_range = {'port_range': "%u:%u" % (from_port, to_port)}
elif from_port is None and to_port is None:
port_range = {'port_range': ""}
else:
port_range = {'port_range': "%s:%s" % (from_port, to_port)}
info.update(port_range)
if 'cidr' in info['ip_range']:
info['ip_range'] = info['ip_range']['cidr']
else:
info['ip_range'] = ''
if info['ip_protocol'] is None:
info['ip_protocol'] = ''
elif info['ip_protocol'].lower() == 'icmp':
info['port_range'] = ''
group = info.pop('group')
if 'name' in group:
info['remote_security_group'] = group['name']
else:
info['remote_security_group'] = ''
return info
def _format_security_group_rule_show(obj):
data = _xform_security_group_rule(obj)
return zip(*sorted(six.iteritems(data)))
def _get_columns(item):
columns = item.keys()
if 'tenant_id' in columns:
columns.remove('tenant_id')
columns.append('project_id')
return tuple(sorted(columns))
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
@ -33,3 +78,44 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
def take_action_compute(self, client, parsed_args):
client.security_group_rules.delete(parsed_args.rule)
class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
"""Display security group rule details"""
def update_parser_common(self, parser):
parser.add_argument(
'rule',
metavar="<rule>",
help="Security group rule to display (ID only)"
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_security_group_rule(parsed_args.rule,
ignore_missing=False)
columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return (columns, data)
def take_action_compute(self, client, parsed_args):
# NOTE(rtheis): Unfortunately, compute does not have an API
# to get or list security group rules so parse through the
# security groups to find all accessible rules in search of
# the requested rule.
obj = None
security_group_rules = []
for security_group in client.security_groups.list():
security_group_rules.extend(security_group.rules)
for security_group_rule in security_group_rules:
if parsed_args.rule == str(security_group_rule.get('id')):
obj = security_group_rule
break
if obj is None:
msg = "Could not find security group rule " \
"with ID %s" % parsed_args.rule
raise exceptions.CommandError(msg)
# NOTE(rtheis): Format security group rule
return _format_security_group_rule_show(obj)

View File

@ -480,15 +480,13 @@ class FakeSecurityGroupRule(object):
:param Dictionary methods:
A dictionary with all methods
:return:
A FakeResource object, with id, name, etc.
A FakeResource object, with id, etc.
"""
# Set default attributes.
security_group_rule_attrs = {
'description': 'security-group-rule-desc-' + uuid.uuid4().hex,
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
'name': 'security-group-rule-name-' + uuid.uuid4().hex,
'port_range_max': None,
'port_range_min': None,
'protocol': None,
@ -502,7 +500,11 @@ class FakeSecurityGroupRule(object):
security_group_rule_attrs.update(attrs)
# Set default methods.
security_group_rule_methods = {}
security_group_rule_methods = {
'keys': ['direction', 'ethertype', 'id', 'port_range_max',
'port_range_min', 'protocol', 'remote_group_id',
'remote_ip_prefix', 'security_group_id', 'tenant_id'],
}
# Overwrite default methods.
security_group_rule_methods.update(methods)
@ -511,6 +513,10 @@ class FakeSecurityGroupRule(object):
info=copy.deepcopy(security_group_rule_attrs),
methods=copy.deepcopy(security_group_rule_methods),
loaded=True)
# Set attributes with special mappings.
security_group_rule.project_id = security_group_rule_attrs['tenant_id']
return security_group_rule
@staticmethod

View File

@ -11,11 +11,14 @@
# under the License.
#
import copy
import mock
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests import fakes
from openstackclient.tests.network.v2 import fakes as network_fakes
from openstackclient.tests import utils as tests_utils
class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
@ -98,3 +101,112 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.compute.security_group_rules.delete.assert_called_with(
self._security_group_rule.id)
self.assertIsNone(result)
class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be shown.
_security_group_rule = \
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
columns = (
'direction',
'ethertype',
'id',
'port_range_max',
'port_range_min',
'project_id',
'protocol',
'remote_group_id',
'remote_ip_prefix',
'security_group_id',
)
data = (
_security_group_rule.direction,
_security_group_rule.ethertype,
_security_group_rule.id,
_security_group_rule.port_range_max,
_security_group_rule.port_range_min,
_security_group_rule.project_id,
_security_group_rule.protocol,
_security_group_rule.remote_group_id,
_security_group_rule.remote_ip_prefix,
_security_group_rule.security_group_id,
)
def setUp(self):
super(TestShowSecurityGroupRuleNetwork, self).setUp()
self.network.find_security_group_rule = mock.Mock(
return_value=self._security_group_rule)
# Get the command object to test
self.cmd = security_group_rule.ShowSecurityGroupRule(
self.app, self.namespace)
def test_show_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
def test_show_all_options(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.find_security_group_rule.assert_called_with(
self._security_group_rule.id, ignore_missing=False)
self.assertEqual(tuple(self.columns), columns)
self.assertEqual(self.data, data)
class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be shown.
_security_group_rule = \
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
columns, data = \
security_group_rule._format_security_group_rule_show(
_security_group_rule._info)
def setUp(self):
super(TestShowSecurityGroupRuleCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
# Build a security group fake customized for this test.
security_group_rules = [self._security_group_rule._info]
security_group = fakes.FakeResource(
info=copy.deepcopy({'rules': security_group_rules}),
loaded=True)
security_group.rules = security_group_rules
self.compute.security_groups.list.return_value = [security_group]
# Get the command object to test
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
def test_show_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
def test_show_all_options(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.compute.security_groups.list.assert_called_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)

View File

@ -0,0 +1,5 @@
---
features:
- |
Add support for ``security group rule show`` command.
[Bug `1519512 <https://bugs.launchpad.net/bugs/1519512>`_]

View File

@ -340,6 +340,7 @@ openstack.network.v2 =
router_show = openstackclient.network.v2.router:ShowRouter
security_group_delete = openstackclient.network.v2.security_group:DeleteSecurityGroup
security_group_rule_delete = openstackclient.network.v2.security_group_rule:DeleteSecurityGroupRule
security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule
subnet_list = openstackclient.network.v2.subnet:ListSubnet
subnet_show = openstackclient.network.v2.subnet:ShowSubnet
subnet_pool_delete = openstackclient.network.v2.subnet_pool:DeleteSubnetPool