Check if security groups is duplicated or not in API

Change-Id: I393620562b8f1f733a62ade12f29706519166dbd
Closes-bug:1698471
This commit is contained in:
wangzhh 2017-07-23 13:27:36 -04:00
parent 080f4fb00f
commit 7d3f1a0558
6 changed files with 110 additions and 28 deletions

View File

@ -310,13 +310,21 @@ class ContainersController(base.Controller):
utils.validate_container_state(container, 'add_security_group')
# check if security group already presnt in container
context = pecan.request.context
compute_api = pecan.request.compute_api
if security_group['name'] in container.security_groups:
msg = _("security_group %s already present in container") % \
security_group['name']
raise exception.InvalidValue(msg)
security_group_id = utils.\
get_security_group_ids(context, [security_group['name']])[0]
container_ports_detail = utils.list_ports(context, container)
context = pecan.request.context
compute_api = pecan.request.compute_api
for container_port_detail in container_ports_detail:
if security_group_id in container_port_detail['security_groups']:
msg = _("security_group %s already present in container") % \
security_group['name']
raise exception.InvalidValue(msg)
compute_api.add_security_group(context, container,
security_group['name'])
pecan.response.status = 202

View File

@ -28,6 +28,7 @@ from oslo_service import loopingcall
import pecan
import six
from zun.common import clients
from zun.common import consts
from zun.common import exception
from zun.common.i18n import _
@ -279,3 +280,36 @@ def parse_floating_cpu(spec):
raise exception.Invalid()
return cpuset_ids
def list_ports(context, container, **kwargs):
container_ports = []
if not container.addresses:
return container_ports
else:
port_list = []
neutron_client = clients.OpenStackClients(context).neutron()
for ports in container.addresses.values():
port_list.extend([p['port'] for p in ports])
port_set = set(port_list)
for port in port_set:
container_ports.append(neutron_client.show_port(port)['port'])
return container_ports
def get_security_group_ids(context, security_groups, **kwargs):
if security_groups is None:
return None
else:
neutron = clients.OpenStackClients(context).neutron()
search_opts = {'tenant_id': context.project_id}
security_groups_list = neutron.list_security_groups(
**search_opts).get('security_groups', [])
security_group_ids = [item['id'] for item in security_groups_list
if item['name'] in security_groups]
if len(security_group_ids) == len(security_groups):
return security_group_ids
else:
raise exception.ZunException(_(
"Any of the security group in %s is not found ") %
security_groups)

View File

@ -183,8 +183,8 @@ class DockerDriver(driver.ContainerDriver):
def _setup_network_for_container(self, context, container,
requested_networks, network_api):
security_group_ids = self._get_security_group_ids(
context, container.security_groups)
security_group_ids = utils.get_security_group_ids(context, container.
security_groups)
# Container connects to the bridge network by default so disconnect
# the container from it before connecting it to neutron network.
# This avoids potential conflict between these two networks.
@ -659,23 +659,6 @@ class DockerDriver(driver.ContainerDriver):
docker.start(sandbox['Id'])
return sandbox['Id']
def _get_security_group_ids(self, context, security_groups):
if security_groups is None:
return None
else:
neutron = clients.OpenStackClients(context).neutron()
search_opts = {'tenant_id': context.project_id}
security_groups_list = neutron.list_security_groups(
**search_opts).get('security_groups', [])
security_group_ids = [item['id'] for item in security_groups_list
if item['name'] in security_groups]
if len(security_group_ids) == len(security_groups):
return security_group_ids
else:
raise exception.ZunException(_(
"Any of the security group in %s is not found ") %
security_groups)
def _get_available_network(self, context):
neutron = clients.OpenStackClients(context).neutron()
search_opts = {'tenant_id': context.project_id, 'shared': False}
@ -765,7 +748,7 @@ class DockerDriver(driver.ContainerDriver):
def add_security_group(self, context, container, security_group,
sandbox_id=None):
security_group_ids = self._get_security_group_ids(
security_group_ids = utils.get_security_group_ids(
context, [security_group])
with docker_utils.docker_client() as docker:
network_api = zun_network.api(context=context, docker_api=docker)

View File

@ -1372,6 +1372,32 @@ class TestContainerController(api_base.FunctionalTest):
mock.ANY, test_container_obj, fake_exec_id, kwargs['h'],
kwargs['w'])
@mock.patch('zun.common.utils.get_security_group_ids')
@mock.patch('zun.common.utils.list_ports')
@mock.patch('zun.api.utils.get_resource')
def test_add_duplicate_default_security_group(self, mock_get_resource,
mock_list_ports,
mock_get_security_group_ids):
test_container = utils.get_test_container()
test_container_obj = objects.Container(self.context, **test_container)
test_container_obj.security_groups = []
mock_get_resource.return_value = test_container_obj
mock_list_ports.return_value = \
[{'security_groups': ['fake_default_security_group_id']}]
mock_get_security_group_ids.return_value = \
['fake_default_security_group_id']
container_name = test_container.get('name')
default_security_group = 'default'
url = '/v1/containers/%s/%s?name=%s' % (container_name,
'add_security_group',
default_security_group)
response = self.app.post(url, expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertEqual(
"security_group %s already present in container" %
default_security_group, response.json['errors'][0]['detail'])
class TestContainerEnforcement(api_base.FunctionalTest):

View File

@ -103,3 +103,37 @@ class TestUtils(base.TestCase):
container.status = 'Running'
self.assertIsNone(utils.validate_container_state(
container, 'execute'))
@mock.patch('zun.common.clients.OpenStackClients.neutron')
def test_list_ports(self, mock_neutron_client):
container = Container(self.context, **db_utils.get_test_container())
container.addresses = None
self.assertEqual([], utils.list_ports(self.context, container))
container.addresses = {'df1e72a0-0251': [
{'version': 4,
'addr': '172.24.4.13',
'port': '2a3fcf1'}]}
neutron_client_instance = mock.MagicMock()
neutron_client_instance.show_port.return_value = \
{'port': 'test_port_info'}
mock_neutron_client.return_value = neutron_client_instance
result_container_ports = utils.list_ports(self.context, container)
self.assertEqual(['test_port_info'], result_container_ports)
@mock.patch('zun.common.clients.OpenStackClients.neutron')
def test_get_security_group_ids(self, mock_neutron_client):
security_groups = None
self.assertIsNone(utils.get_security_group_ids(self.context,
security_groups))
security_groups = ['test_security_group_name']
neutron_client_instance = mock.MagicMock()
neutron_client_instance.list_security_groups.return_value = \
{'security_groups': [{'id': 'test_security_group_id',
'name': 'test_security_group_name'}]}
mock_neutron_client.return_value = neutron_client_instance
security_group_ids = utils.get_security_group_ids(self.context,
security_groups)
self.assertEqual(['test_security_group_id'], security_group_ids)
security_groups = ["not_attached_security_group_name"]
self.assertRaises(exception.ZunException, utils.get_security_group_ids,
self.context, security_groups)

View File

@ -81,8 +81,7 @@ class TestDockerDriver(base.DriverTestCase):
@mock.patch('zun.network.kuryr_network.KuryrNetwork'
'.connect_container_to_network')
@mock.patch(
'zun.container.docker.driver.DockerDriver._get_security_group_ids')
@mock.patch('zun.common.utils.get_security_group_ids')
@mock.patch('zun.objects.container.Container.save')
def test_create_image_path_is_none(self, mock_save,
mock_get_security_group_ids,
@ -316,8 +315,7 @@ class TestDockerDriver(base.DriverTestCase):
@mock.patch('zun.network.kuryr_network.KuryrNetwork'
'.connect_container_to_network')
@mock.patch('zun.container.docker.driver.DockerDriver.get_sandbox_name')
@mock.patch(
'zun.container.docker.driver.DockerDriver._get_security_group_ids')
@mock.patch('zun.common.utils.get_security_group_ids')
def test_create_sandbox(self, mock_get_security_group_ids,
mock_get_sandbox_name, mock_connect):
sandbox_name = 'my_test_sandbox'
@ -337,8 +335,7 @@ class TestDockerDriver(base.DriverTestCase):
@mock.patch('zun.network.kuryr_network.KuryrNetwork'
'.connect_container_to_network')
@mock.patch('zun.container.docker.driver.DockerDriver.get_sandbox_name')
@mock.patch(
'zun.container.docker.driver.DockerDriver._get_security_group_ids')
@mock.patch('zun.common.utils.get_security_group_ids')
def test_create_sandbox_with_long_name(self, mock_get_security_group_ids,
mock_get_sandbox_name,
mock_connect):