Merge "Fix the broken add_security_group endpoint"

This commit is contained in:
Jenkins 2017-08-07 06:41:53 +00:00 committed by Gerrit Code Review
commit 2484575576
3 changed files with 69 additions and 30 deletions

View File

@ -183,6 +183,11 @@ class ZunClient(rest_client.RestClient):
return self.get( return self.get(
self.container_uri(container_id, action='stats'), None, **kwargs) self.container_uri(container_id, action='stats'), None, **kwargs)
def add_security_group(self, container_id, model, **kwargs):
return self.post(
self.container_uri(container_id, action='add_security_group'),
body=model.to_json(), **kwargs)
def list_services(self, **kwargs): def list_services(self, **kwargs):
resp, body = self.get(self.services_uri(), **kwargs) resp, body = self.get(self.services_uri(), **kwargs)
return self.deserialize(resp, body, return self.deserialize(resp, body,

View File

@ -92,3 +92,14 @@ def container_rename_data(**kwargs):
model = container_model.ContainerPatchEntity.from_dict(data) model = container_model.ContainerPatchEntity.from_dict(data)
return model return model
def container_add_sg_data(**kwargs):
data = {
'name': 'sg_name',
}
data.update(kwargs)
model = container_model.ContainerPatchEntity.from_dict(data)
return model

View File

@ -159,42 +159,18 @@ class TestContainer(base.BaseZunTest):
gen_model = datagen.container_data() gen_model = datagen.container_data()
delattr(gen_model, 'security_groups') delattr(gen_model, 'security_groups')
_, model = self._run_container(gen_model=gen_model) _, model = self._run_container(gen_model=gen_model)
sgs = self._get_all_security_groups(model)
# find the neutron port of this container self.assertEqual(1, len(sgs))
port_ids = set() self.assertEqual('default', sgs[0])
for addrs_list in model.addresses.values():
for addr in addrs_list:
port_id = addr['port']
port_ids.add(port_id)
self.assertEqual(1, len(port_ids))
# verify default security_group is applied
port_id = port_ids.pop()
port = self.ports_client.show_port(port_id)
sg_ids = port['port']['security_groups']
self.assertEqual(1, len(sg_ids))
sg = self.sgs_client.show_security_group(sg_ids[0])
self.assertEqual('default', sg['security_group']['name'])
@decorators.idempotent_id('f181eeda-a9d1-4b2e-9746-d6634ca81e2f') @decorators.idempotent_id('f181eeda-a9d1-4b2e-9746-d6634ca81e2f')
def test_run_container_with_security_groups(self): def test_run_container_with_security_groups(self):
sg_name = 'test_sg' sg_name = 'test_sg'
self.sgs_client.create_security_group(name=sg_name) self.sgs_client.create_security_group(name=sg_name)
_, model = self._run_container(security_groups=[sg_name]) _, model = self._run_container(security_groups=[sg_name])
sgs = self._get_all_security_groups(model)
# find the neutron port of this container self.assertEqual(1, len(sgs))
port_ids = set() self.assertEqual(sg_name, sgs[0])
for addrs_list in model.addresses.values():
for addr in addrs_list:
port_id = addr['port']
port_ids.add(port_id)
self.assertEqual(1, len(port_ids))
# verify default security_group is applied
port_id = port_ids.pop()
port = self.ports_client.show_port(port_id)
sg_ids = port['port']['security_groups']
self.assertEqual(1, len(sg_ids))
sg = self.sgs_client.show_security_group(sg_ids[0])
self.assertEqual(sg_name, sg['security_group']['name'])
@decorators.idempotent_id('c3f02fa0-fdfb-49fc-95e2-6e4dc982f9be') @decorators.idempotent_id('c3f02fa0-fdfb-49fc-95e2-6e4dc982f9be')
def test_commit_container(self): def test_commit_container(self):
@ -367,6 +343,31 @@ class TestContainer(base.BaseZunTest):
self.assertTrue('MEM %' in encodeutils.safe_decode(body)) self.assertTrue('MEM %' in encodeutils.safe_decode(body))
self.assertTrue('BLOCK I/O(B)' in encodeutils.safe_decode(body)) self.assertTrue('BLOCK I/O(B)' in encodeutils.safe_decode(body))
@decorators.idempotent_id('b3b9cf17-82ad-4c1b-a4af-8210a778a33e')
def test_add_sg_to_container(self):
_, model = self._run_container()
sgs = self._get_all_security_groups(model)
self.assertEqual(1, len(sgs))
self.assertEqual('default', sgs[0])
sg_name = 'test_add_sg'
self.sgs_client.create_security_group(name=sg_name)
gen_model = datagen.container_add_sg_data(name=sg_name)
resp, body = self.container_client.add_security_group(
model.uuid, gen_model)
self.assertEqual(202, resp.status)
def assert_security_group_is_added():
sgs = self._get_all_security_groups(model)
if len(sgs) == 2:
self.assertTrue('default' in sgs)
self.assertTrue(sg_name in sgs)
return True
else:
return False
utils.wait_for_condition(assert_security_group_is_added)
def _assert_resource_constraints(self, container, cpu=None, memory=None): def _assert_resource_constraints(self, container, cpu=None, memory=None):
if cpu is not None: if cpu is not None:
cpu_quota = container.get('HostConfig').get('CpuQuota') cpu_quota = container.get('HostConfig').get('CpuQuota')
@ -427,3 +428,25 @@ class TestContainer(base.BaseZunTest):
return 'Created' return 'Created'
else: else:
return 'Stopped' return 'Stopped'
def _get_all_security_groups(self, container):
# find all neutron ports of this container
port_ids = set()
for addrs_list in container.addresses.values():
for addr in addrs_list:
port_id = addr['port']
port_ids.add(port_id)
# find all security groups of this container
sg_ids = set()
for port_id in port_ids:
port = self.ports_client.show_port(port_id)
for sg in port['port']['security_groups']:
sg_ids.add(sg)
sg_names = []
for sg_id in sg_ids:
sg = self.sgs_client.show_security_group(sg_id)
sg_names.append(sg['security_group']['name'])
return sg_names