Consolidate Container and Capsule in compute

In before, create/delete Capsule has its own RPC api and compute
manager implementation. The code is largely duplicated with
the Container equivalent. In fact, the code duplication leads
to bugs or missing features and it is hard to maintain.

This commit refactor the compute node implementation for capsule.
First, the capsule RPC API is removed and the controller will
use the container RPC for create/delete capsule in compute node.
Second, the capsule implementation is removed from compute manager.
Instead, we will reuse the container implementation for capsule.
Third, we introduce capsule operation in container driver.
The capsule-specific logic will be implemented by different drivers.

After this patch, all existing container features (i.e. resource
tracking and claiming, asynchronized delete, etc.) will be available
for capsule immediately. In long term, the common implementation
for capsule and container will be easier to maintain.

Closes-Bug: #1801649
Closes-Bug: #1777273
Partial-Bug: #1762902
Partial-Bug: #1751193
Partial-Bug: #1748825
Change-Id: Ie1d806738fcd945a4f370bdfc7fa8fb5fb815e8d
This commit is contained in:
Hongbin Lu 2019-01-26 23:50:22 +00:00
parent b54a91b0c4
commit d0a7940981
13 changed files with 187 additions and 202 deletions

View File

@ -25,7 +25,7 @@
test-config:
$TEMPEST_CONFIG:
container_service:
min_microversion: 1.27
min_microversion: 1.32
devstack_services:
tempest: true
devstack_plugins:

View File

@ -13,7 +13,6 @@
# under the License.
from oslo_log import log as logging
import pecan
import six
@ -252,8 +251,13 @@ class CapsuleController(base.Controller):
new_capsule.cpu = capsule_need_cpu
new_capsule.memory = str(capsule_need_memory)
new_capsule.save(context)
compute_api.capsule_create(context, new_capsule, requested_networks,
requested_volumes, extra_spec)
kwargs = {}
kwargs['extra_spec'] = extra_spec
kwargs['requested_networks'] = requested_networks
kwargs['requested_volumes'] = requested_volumes
kwargs['run'] = True
compute_api.container_create(context, new_capsule, **kwargs)
# Set the HTTP Location Header
pecan.response.location = link.build_url('capsules',
new_capsule.uuid)
@ -291,7 +295,8 @@ class CapsuleController(base.Controller):
compute_api = pecan.request.compute_api
capsule.task_state = consts.CONTAINER_DELETING
capsule.save(context)
compute_api.capsule_delete(context, capsule)
compute_api.container_stop(context, capsule, 10)
compute_api.container_delete(context, capsule)
pecan.response.status = 204
def _generate_name_for_capsule_container(self, new_capsule):

View File

@ -64,10 +64,11 @@ REST_API_VERSION_HISTORY = """REST API Version History:
* 1.29 - Add enable_cpu_pinning to compute_node
* 1.30 - Introduce API resource for representing private registry
* 1.31 - Add 'registry_id' to containers
* 1.32 - Make capsule deletion asynchronized
"""
BASE_VER = '1.1'
CURRENT_MAX_VER = '1.31'
CURRENT_MAX_VER = '1.32'
class Version(object):

View File

@ -241,3 +241,10 @@ user documentation.
Add 'registry_id' to container resource.
This attribute indicate the registry from which the container pulls images.
1.32
----
Make capsule deletion asynchronized.
API request to delete a capsule will return without waiting for the
capsule to be deleted.

View File

@ -208,32 +208,6 @@ class API(object):
return self.rpcapi.image_search(context, image, image_driver,
exact_match, *args)
def capsule_create(self, context, new_capsule, requested_networks,
requested_volumes, extra_spec):
try:
host_state = self._schedule_container(context, new_capsule,
extra_spec)
except exception.NoValidHost:
new_capsule.status = consts.ERROR
new_capsule.status_reason = _(
"There are not enough hosts available.")
new_capsule.save(context)
return
except Exception:
new_capsule.status = consts.ERROR
new_capsule.status_reason = _("Unexpected exception occurred.")
new_capsule.save(context)
raise
for container in new_capsule.containers:
self._record_action_start(context, container,
container_actions.CREATE)
self.rpcapi.capsule_create(context, host_state['host'], new_capsule,
requested_networks, requested_volumes,
host_state['limits'])
def capsule_delete(self, context, capsule):
return self.rpcapi.capsule_delete(context, capsule)
def network_detach(self, context, container, *args):
self._record_action_start(context, container,
container_actions.NETWORK_DETACH)

View File

@ -319,9 +319,15 @@ class Manager(periodic_task.PeriodicTasks):
self.driver.read_tar_image(image)
if image['tag'] != tag:
LOG.warning("The input tag is different from the tag in tar")
container = self.driver.create(context, container, image,
requested_networks,
requested_volumes)
if isinstance(container, objects.Capsule):
container = self.driver.create_capsule(context, container,
image,
requested_networks,
requested_volumes)
elif isinstance(container, objects.Container):
container = self.driver.create(context, container, image,
requested_networks,
requested_volumes)
self._update_task_state(context, container, None)
return container
except exception.DockerError as e:
@ -502,7 +508,11 @@ class Manager(periodic_task.PeriodicTasks):
self._update_task_state(context, container, consts.CONTAINER_DELETING)
reraise = not force
try:
self.driver.delete(context, container, force)
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container, force)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, force)
if self.use_sandbox:
self._delete_sandbox(context, container, reraise)
except exception.DockerError as e:
@ -650,7 +660,10 @@ class Manager(periodic_task.PeriodicTasks):
try:
self._update_task_state(context, container,
consts.CONTAINER_DELETING)
self.driver.delete(context, container, True)
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, True)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Rebuild container: %s failed, "
@ -1162,83 +1175,6 @@ class Manager(periodic_task.PeriodicTasks):
self.driver.update_containers_states(ctx, containers, self)
capsules = objects.Capsule.list(ctx)
self.driver.update_containers_states(ctx, capsules, self)
LOG.debug('Complete syncing container states.')
def capsule_create(self, context, capsule, requested_networks,
requested_volumes, limits):
@utils.synchronized("capsule-" + capsule.uuid)
def do_capsule_create():
self._do_capsule_create(context, capsule, requested_networks,
requested_volumes, limits)
utils.spawn_n(do_capsule_create)
def _do_capsule_create(self, context, capsule,
requested_networks=None,
requested_volumes=None,
limits=None):
"""Create capsule in the compute node
:param context: security context
:param capsule: the special capsule object
:param requested_networks: the network ports that capsule will
connect
:param requested_volumes: the volume that capsule need
:param limits: no use field now.
"""
# NOTE(kevinz): Here create the sandbox container for the
# first function container --> capsule.containers[1].
# capsule.containers[0] will only be used as recording the
# the sandbox_container info, and the sandbox_id of this contianer
# is itself.
sandbox_id = self._create_sandbox(context,
capsule,
requested_networks)
# Create init containers first
if capsule.init_containers:
for container in capsule.init_containers:
self._do_capsule_create_each_container(context,
capsule,
container,
sandbox_id,
limits,
requested_volumes,
requested_networks)
for container in capsule.init_containers:
self._wait_for_containers_completed(context, container)
# Create common containers
for container in capsule.containers:
self._do_capsule_create_each_container(context,
capsule,
container,
sandbox_id,
limits,
requested_volumes,
requested_networks)
capsule.host = self.host
capsule.status = consts.RUNNING
capsule.save(context)
def capsule_delete(self, context, capsule):
# NOTE(kevinz): Delete functional containers first and then delete
# sandbox container
for container in (capsule.containers + capsule.init_containers):
try:
self._do_container_delete(context, container, force=True)
except Exception as e:
uuid = container.uuid
LOG.exception("Failed to delete container %(uuid0)s because "
"it doesn't exist in the capsule. Stale data "
"identified by %(uuid1)s is deleted from "
"database: %(error)s",
{'uuid0': uuid, 'uuid1': uuid, 'error': e})
try:
self._delete_sandbox(context, capsule, reraise=False)
self._do_container_delete(context, capsule, force=True)
except Exception as e:
LOG.exception(e)
def network_detach(self, context, container, network):
@utils.synchronized(container.uuid)
@ -1292,47 +1228,3 @@ class Manager(periodic_task.PeriodicTasks):
self.container_update(context, container, patch)
utils.spawn_n(do_container_resize)
def _do_capsule_create_each_container(self, context, capsule,
container, sandbox_id,
limits=None,
requested_volumes=None,
requested_networks=None):
container_requested_volumes = []
container.set_sandbox_id(sandbox_id)
container.addresses = capsule.addresses
container_name = container.name
for volume in requested_volumes:
if volume.get(container_name, None):
container_requested_volumes.append(
volume.get(container_name))
self._attach_volumes(context, container,
container_requested_volumes)
# Make sure the sandbox_id is set into meta. If not,
# when container delete, it will delete container network
# without considering sandbox.
container.save(context)
# Add volume assignment
created_container = \
self._do_container_create_base(context,
container,
requested_networks,
container_requested_volumes,
sandbox=capsule,
limits=limits)
self._do_container_start(context, created_container)
def _wait_for_containers_completed(self, context, container,
timeout=60, poll_interval=1):
start_time = time.time()
while time.time() - start_time < timeout:
container = self.driver.show(context, container)
if container.status == consts.STOPPED:
return
time.sleep(poll_interval)
msg = _('Init container %(container_name)s failed: ') % {
'container_name': container.name
}
self._fail_container(context, container, msg, unset_host=True)
raise exception.Invalid(msg)

View File

@ -66,7 +66,7 @@ class API(rpc_service.API):
pci_requests=pci_requests)
@check_container_host
def container_delete(self, context, container, force):
def container_delete(self, context, container, force=False):
return self._cast(container.host, 'container_delete',
container=container, force=force)

View File

@ -1260,3 +1260,120 @@ class DockerDriver(driver.ContainerDriver):
network_api = zun_network.api(context,
docker_api=docker)
network_api.remove_network(network)
def create_capsule(self, context, capsule, image, requested_networks,
requested_volumes):
capsule = self.create(context, capsule, image, requested_networks,
requested_volumes)
self.start(context, capsule)
for container in capsule.containers:
self._create_container_in_capsule(context, capsule, container,
requested_networks,
requested_volumes)
return capsule
def _create_container_in_capsule(self, context, capsule, container,
requested_volumes, requested_networks):
# pull image
image_driver_name = container.image_driver
repo, tag = utils.parse_image_name(container.image, image_driver_name)
image_pull_policy = utils.get_image_pull_policy(
container.image_pull_policy, tag)
image, image_loaded = self.pull_image(
context, repo, tag, image_pull_policy, image_driver_name)
image['repo'], image['tag'] = repo, tag
if not image_loaded:
self.load_image(image['path'])
if image_driver_name == 'glance':
self.read_tar_image(image)
if image['tag'] != tag:
LOG.warning("The input tag is different from the tag in tar")
# create container
with docker_utils.docker_client() as docker:
name = container.name
LOG.debug('Creating container with image %(image)s name %(name)s',
{'image': image['image'], 'name': name})
binds = self._get_binds(context, requested_volumes)
kwargs = {
'name': self.get_container_name(container),
'command': container.command,
'environment': container.environment,
'working_dir': container.workdir,
'labels': container.labels,
'tty': container.interactive,
'stdin_open': container.interactive,
}
host_config = {}
host_config['privileged'] = container.privileged
host_config['binds'] = binds
kwargs['volumes'] = [b['bind'] for b in binds.values()]
host_config['network_mode'] = 'container:%s' % capsule.container_id
# TODO(hongbin): Uncomment this after docker-py add support for
# container mode for pid namespace.
# host_config['pid_mode'] = 'container:%s' % capsule.container_id
host_config['ipc_mode'] = 'container:%s' % capsule.container_id
if container.auto_remove:
host_config['auto_remove'] = container.auto_remove
if container.memory is not None:
host_config['mem_limit'] = str(container.memory) + 'M'
if container.cpu is not None:
host_config['cpu_quota'] = int(100000 * container.cpu)
host_config['cpu_period'] = 100000
if container.restart_policy:
count = int(container.restart_policy['MaximumRetryCount'])
name = container.restart_policy['Name']
host_config['restart_policy'] = {'Name': name,
'MaximumRetryCount': count}
if container.disk:
disk_size = str(container.disk) + 'G'
host_config['storage_opt'] = {'size': disk_size}
# The time unit in docker of heath checking is us, and the unit
# of interval and timeout is seconds.
if container.healthcheck:
healthcheck = {}
healthcheck['test'] = container.healthcheck.get('test', '')
interval = container.healthcheck.get('interval', 0)
healthcheck['interval'] = interval * 10 ** 9
healthcheck['retries'] = int(container.healthcheck.
get('retries', 0))
timeout = container.healthcheck.get('timeout', 0)
healthcheck['timeout'] = timeout * 10 ** 9
kwargs['healthcheck'] = healthcheck
kwargs['host_config'] = docker.create_host_config(**host_config)
if image['tag']:
image_repo = image['repo'] + ":" + image['tag']
else:
image_repo = image['repo']
response = docker.create_container(image_repo, **kwargs)
container.container_id = response['Id']
docker.start(container.container_id)
response = docker.inspect_container(container.container_id)
self._populate_container(container, response)
container.save(context)
def delete_capsule(self, context, capsule, force):
for container in capsule.containers:
self._delete_container_in_capsule(context, capsule, container,
force)
self.delete(context, capsule, force)
def _delete_container_in_capsule(self, context, capsule, container, force):
if not container.container_id:
return
with docker_utils.docker_client() as docker:
try:
docker.stop(container.container_id)
docker.remove_container(container.container_id,
force=force)
except errors.APIError as api_error:
if is_not_found(api_error):
return
if is_not_connected(api_error):
return
raise

View File

@ -297,3 +297,9 @@ class ContainerDriver(object):
def delete_image(self, context, img_id, image_driver):
raise NotImplementedError()
def create_capsule(self, context, capsule, **kwargs):
raise NotImplementedError()
def delete_capsule(self, context, capsule, **kwargs):
raise NotImplementedError()

View File

@ -26,7 +26,7 @@ from zun.tests.unit.db import base
PATH_PREFIX = '/v1'
CURRENT_VERSION = "container 1.31"
CURRENT_VERSION = "container 1.32"
class FunctionalTest(base.DbTestCase):

View File

@ -28,7 +28,7 @@ class TestRootController(api_base.FunctionalTest):
'default_version':
{'id': 'v1',
'links': [{'href': 'http://localhost/v1/', 'rel': 'self'}],
'max_version': '1.31',
'max_version': '1.32',
'min_version': '1.1',
'status': 'CURRENT'},
'description': 'Zun is an OpenStack project which '
@ -37,7 +37,7 @@ class TestRootController(api_base.FunctionalTest):
'versions': [{'id': 'v1',
'links': [{'href': 'http://localhost/v1/',
'rel': 'self'}],
'max_version': '1.31',
'max_version': '1.32',
'min_version': '1.1',
'status': 'CURRENT'}]}

View File

@ -21,7 +21,7 @@ from zun.tests.unit.db import utils
class TestCapsuleController(api_base.FunctionalTest):
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule(self, mock_capsule_create,
mock_neutron_get_network):
@ -56,7 +56,7 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertTrue(mock_capsule_create.called)
self.assertTrue(mock_neutron_get_network.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_two_containers(self, mock_capsule_create,
mock_neutron_get_network):
@ -92,7 +92,7 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertTrue(mock_capsule_create.called)
self.assertTrue(mock_neutron_get_network.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.common.utils.check_capsule_template')
def test_create_capsule_wrong_kind_set(self, mock_check_template,
mock_capsule_create):
@ -109,7 +109,7 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertEqual(400, response.status_int)
self.assertFalse(mock_capsule_create.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.common.utils.check_capsule_template')
def test_create_capsule_less_than_one_container(self, mock_check_template,
mock_capsule_create):
@ -123,7 +123,7 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertEqual(400, response.status_int)
self.assertFalse(mock_capsule_create.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.common.utils.check_capsule_template')
def test_create_capsule_no_container_field(self, mock_check_template,
mock_capsule_create):
@ -137,7 +137,7 @@ class TestCapsuleController(api_base.FunctionalTest):
params=params, content_type='application/json')
self.assertFalse(mock_capsule_create.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.common.utils.check_capsule_template')
def test_create_capsule_no_container_image(self, mock_check_template,
mock_capsule_create):
@ -152,7 +152,7 @@ class TestCapsuleController(api_base.FunctionalTest):
params=params, content_type='application/json')
self.assertFalse(mock_capsule_create.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_with_init_containers(self, mock_capsule_create,
mock_neutron_get_network):
@ -192,7 +192,7 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertTrue(mock_capsule_create.called)
self.assertTrue(mock_neutron_get_network.called)
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_with_two_init_containers(self, mock_capsule_create,
mock_neutron_get_network):
@ -233,7 +233,7 @@ class TestCapsuleController(api_base.FunctionalTest):
@patch('zun.volume.cinder_api.CinderAPI.ensure_volume_usable')
@patch('zun.volume.cinder_api.CinderAPI.create_volume')
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_with_create_new_volume(self, mock_capsule_create,
mock_neutron_get_network,
@ -283,7 +283,7 @@ class TestCapsuleController(api_base.FunctionalTest):
@patch('zun.volume.cinder_api.CinderAPI.ensure_volume_usable')
@patch('zun.volume.cinder_api.CinderAPI.search_volume')
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_with_existed_volume(self, mock_capsule_create,
mock_neutron_get_network,
@ -336,7 +336,7 @@ class TestCapsuleController(api_base.FunctionalTest):
@patch('zun.volume.cinder_api.CinderAPI.create_volume')
@patch('zun.volume.cinder_api.CinderAPI.ensure_volume_usable')
@patch('zun.volume.cinder_api.CinderAPI.search_volume')
@patch('zun.compute.api.API.capsule_create')
@patch('zun.compute.api.API.container_create')
@patch('zun.network.neutron.NeutronAPI.get_available_network')
def test_create_capsule_with_two_volumes(self, mock_capsule_create,
mock_neutron_get_network,
@ -441,18 +441,14 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertEqual(test_capsule['uuid'],
response.json['uuid'])
@patch('zun.compute.api.API.capsule_delete')
@patch('zun.compute.api.API.container_delete')
@patch('zun.compute.api.API.container_stop')
@patch('zun.objects.Capsule.get_by_uuid')
@patch('zun.objects.Container.get_by_uuid')
@patch('zun.objects.Capsule.save')
def test_delete_capsule_by_uuid(self, mock_capsule_save,
mock_container_get_by_uuid,
mock_capsule_get_by_uuid,
mock_capsule_stop,
mock_capsule_delete):
test_container = utils.get_test_container()
test_container_obj = objects.Container(self.context, **test_container)
mock_container_get_by_uuid.return_value = test_container_obj
test_capsule = utils.create_test_container(context=self.context)
test_capsule_obj = objects.Capsule(self.context,
**test_capsule)
@ -464,26 +460,23 @@ class TestCapsuleController(api_base.FunctionalTest):
response = self.app.delete('/v1/capsules/%s' % capsule_uuid)
self.assertTrue(mock_capsule_delete.called)
self.assertTrue(mock_capsule_stop.called)
self.assertEqual(204, response.status_int)
context = mock_capsule_save.call_args[0][0]
self.assertIs(False, context.all_projects)
@patch('zun.common.policy.enforce')
@patch('zun.compute.api.API.capsule_delete')
@patch('zun.compute.api.API.container_delete')
@patch('zun.compute.api.API.container_stop')
@patch('zun.objects.Capsule.get_by_uuid')
@patch('zun.objects.Container.get_by_uuid')
@patch('zun.objects.Capsule.save')
def test_delete_capsule_by_uuid_all_projects(self,
mock_capsule_save,
mock_container_get_by_uuid,
mock_capsule_get_by_uuid,
mock_capsule_stop,
mock_capsule_delete,
mock_policy):
mock_policy.return_value = True
test_container = utils.get_test_container()
test_container_obj = objects.Container(self.context, **test_container)
mock_container_get_by_uuid.return_value = test_container_obj
test_capsule = utils.create_test_container(context=self.context)
test_capsule_obj = objects.Capsule(self.context,
**test_capsule)
@ -496,6 +489,7 @@ class TestCapsuleController(api_base.FunctionalTest):
'/v1/capsules/%s/?all_projects=1' % capsule_uuid)
self.assertTrue(mock_capsule_delete.called)
self.assertTrue(mock_capsule_stop.called)
self.assertEqual(204, response.status_int)
context = mock_capsule_save.call_args[0][0]
self.assertIs(True, context.all_projects)
@ -505,18 +499,14 @@ class TestCapsuleController(api_base.FunctionalTest):
self.assertRaises(AppError, self.app.delete,
'/capsules/%s' % uuid)
@patch('zun.compute.api.API.capsule_delete')
@patch('zun.compute.api.API.container_delete')
@patch('zun.compute.api.API.container_stop')
@patch('zun.objects.Capsule.get_by_name')
@patch('zun.objects.Container.get_by_uuid')
@patch('zun.objects.Capsule.save')
def test_delete_capsule_by_name(self, mock_capsule_save,
mock_container_get_by_name,
mock_capsule_get_by_uuid,
mock_capsule_stop,
mock_capsule_delete):
test_container = utils.get_test_container()
test_container_obj = objects.Container(self.context, **test_container)
mock_container_get_by_name.return_value = test_container_obj
test_capsule = utils.create_test_container(context=self.context)
test_capsule_obj = objects.Capsule(self.context,
**test_capsule)
@ -529,6 +519,7 @@ class TestCapsuleController(api_base.FunctionalTest):
capsule_name)
self.assertTrue(mock_capsule_delete.called)
self.assertTrue(mock_capsule_stop.called)
self.assertEqual(204, response.status_int)
context = mock_capsule_save.call_args[0][0]
self.assertIs(False, context.all_projects)

View File

@ -116,14 +116,6 @@ class TestAPI(base.TestCase):
self.assertTrue(mock_save.called)
self.assertEqual(consts.ERROR, container.status)
@mock.patch('zun.compute.rpcapi.API._call')
def test_capsule_delete(self, mock_call):
capsule = self.container
self.compute_api.capsule_delete(
self.context, capsule)
mock_call.assert_called_once_with(
capsule.host, "capsule_delete", capsule=capsule)
@mock.patch('zun.compute.rpcapi.API._cast')
@mock.patch('zun.api.servicegroup.ServiceGroup.service_is_up')
@mock.patch('zun.objects.ZunService.list_by_binary')