Sync container's host in periodic task
The container's host needs to be synced periodically because the value of the host is mutable (in config file). If the value of a host is changed but it is not synced to containers, the rpc request will send to the old host which will fail. In order to squeeze the logic of syncing host, the existing periodic task was re-written because the original task was mainly designed to sync a single container state (i.e. status). This commit re-wrote the implementation make it generic and can be leveraged to sync various container states (i.e. host, status). In addition, this commit moved the implementation to container driver since syncing container states seems to be driver-specific. Change-Id: I998af06989363b5329ae1f853dd01388ddaeb1cc
This commit is contained in:
parent
ffbad00b50
commit
dd5e21858d
@ -27,7 +27,7 @@ from zun.common.utils import check_container_id
|
||||
import zun.conf
|
||||
from zun.container.docker import utils as docker_utils
|
||||
from zun.container import driver
|
||||
|
||||
from zun import objects
|
||||
|
||||
CONF = zun.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -112,10 +112,54 @@ class DockerDriver(driver.ContainerDriver):
|
||||
return
|
||||
raise
|
||||
|
||||
def list(self):
|
||||
def list(self, context):
|
||||
id_to_container_map = {}
|
||||
with docker_utils.docker_client() as docker:
|
||||
return [container for container in docker.list_containers()
|
||||
if 'zun-sandbox-' not in container['Names'][0]]
|
||||
id_to_container_map = {c['Id']: c
|
||||
for c in docker.list_containers()}
|
||||
|
||||
db_containers = objects.Container.list_by_host(context, CONF.host)
|
||||
for db_container in db_containers:
|
||||
container_id = db_container.container_id
|
||||
docker_container = id_to_container_map.get(container_id)
|
||||
if docker_container:
|
||||
self._populate_container(db_container, docker_container)
|
||||
else:
|
||||
# Set to error state if the container was recorded in DB but
|
||||
# missing in docker.
|
||||
db_container.status = consts.ERROR
|
||||
|
||||
return db_containers
|
||||
|
||||
def update_containers_states(self, context, containers):
|
||||
my_containers = self.list(context)
|
||||
if not my_containers:
|
||||
return
|
||||
|
||||
id_to_my_container_map = {container.container_id: container
|
||||
for container in my_containers}
|
||||
id_to_container_map = {container.container_id: container
|
||||
for container in containers}
|
||||
|
||||
for cid in (six.viewkeys(id_to_container_map) &
|
||||
six.viewkeys(id_to_my_container_map)):
|
||||
container = id_to_container_map[cid]
|
||||
# sync status
|
||||
my_container = id_to_my_container_map[cid]
|
||||
if container.status != my_container.status:
|
||||
old_status = container.status
|
||||
container.status = my_container.status
|
||||
container.save(context)
|
||||
LOG.info('Status of container %s changed from %s to %s',
|
||||
container.uuid, old_status, container.status)
|
||||
# sync host
|
||||
my_host = CONF.host
|
||||
if container.host != my_host:
|
||||
old_host = container.host
|
||||
container.host = my_host
|
||||
container.save(context)
|
||||
LOG.info('Host of container %s changed from %s to %s',
|
||||
container.uuid, old_host, container.host)
|
||||
|
||||
def show(self, container):
|
||||
with docker_utils.docker_client() as docker:
|
||||
@ -164,31 +208,31 @@ class DockerDriver(driver.ContainerDriver):
|
||||
return
|
||||
|
||||
def _populate_container(self, container, response):
|
||||
status = response.get('State')
|
||||
if status:
|
||||
state = response.get('State')
|
||||
if type(state) is dict:
|
||||
status_detail = ''
|
||||
if status.get('Error'):
|
||||
if state.get('Error'):
|
||||
container.status = consts.ERROR
|
||||
status_detail = self.format_status_detail(
|
||||
status.get('FinishedAt'))
|
||||
state.get('FinishedAt'))
|
||||
container.status_detail = "Exited({}) {} ago " \
|
||||
"(error)".format(status.get('ExitCode'), status_detail)
|
||||
elif status.get('Paused'):
|
||||
"(error)".format(state.get('ExitCode'), status_detail)
|
||||
elif state.get('Paused'):
|
||||
container.status = consts.PAUSED
|
||||
status_detail = self.format_status_detail(
|
||||
status.get('StartedAt'))
|
||||
state.get('StartedAt'))
|
||||
container.status_detail = "Up {} (paused)".format(
|
||||
status_detail)
|
||||
elif status.get('Running'):
|
||||
elif state.get('Running'):
|
||||
container.status = consts.RUNNING
|
||||
status_detail = self.format_status_detail(
|
||||
status.get('StartedAt'))
|
||||
state.get('StartedAt'))
|
||||
container.status_detail = "Up {}".format(
|
||||
status_detail)
|
||||
else:
|
||||
started_at = self.format_status_detail(status.get('StartedAt'))
|
||||
started_at = self.format_status_detail(state.get('StartedAt'))
|
||||
finished_at = self.format_status_detail(
|
||||
status.get('FinishedAt'))
|
||||
state.get('FinishedAt'))
|
||||
if started_at == "":
|
||||
container.status = consts.CREATED
|
||||
container.status_detail = "Created"
|
||||
@ -198,9 +242,23 @@ class DockerDriver(driver.ContainerDriver):
|
||||
else:
|
||||
container.status = consts.STOPPED
|
||||
container.status_detail = "Exited({}) {} ago ".format(
|
||||
status.get('ExitCode'), finished_at)
|
||||
state.get('ExitCode'), finished_at)
|
||||
if status_detail is None:
|
||||
container.status_detail = None
|
||||
else:
|
||||
if state.lower() == 'created':
|
||||
container.status = consts.CREATED
|
||||
elif state.lower() == 'paused':
|
||||
container.status = consts.PAUSED
|
||||
elif state.lower() == 'running':
|
||||
container.status = consts.RUNNING
|
||||
elif state.lower() == 'dead':
|
||||
container.status = consts.ERROR
|
||||
elif state.lower() in ('restarting', 'exited', 'removing'):
|
||||
container.status = consts.STOPPED
|
||||
else:
|
||||
container.status = consts.UNKNOWN
|
||||
container.status_detail = None
|
||||
|
||||
config = response.get('Config')
|
||||
if config:
|
||||
|
@ -67,10 +67,14 @@ class ContainerDriver(object):
|
||||
"""Delete a container."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def list(self):
|
||||
def list(self, context):
|
||||
"""List all containers."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def update_containers_states(self, context, containers):
|
||||
"""Update containers states."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def show(self, container):
|
||||
"""Show the details of a container."""
|
||||
raise NotImplementedError()
|
||||
|
@ -33,7 +33,8 @@ class Container(base.ZunPersistentObject, base.ZunObject):
|
||||
# Version 1.11: Add image_driver
|
||||
# Version 1.12: Add 'Created' to ContainerStatus
|
||||
# Version 1.13: Add more task states for container
|
||||
VERSION = '1.13'
|
||||
# Version 1.14: Add method 'list_by_host'
|
||||
VERSION = '1.14'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
@ -125,6 +126,18 @@ class Container(base.ZunPersistentObject, base.ZunObject):
|
||||
sort_dir=sort_dir, filters=filters)
|
||||
return Container._from_db_object_list(db_containers, cls, context)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def list_by_host(cls, context, host):
|
||||
"""Return a list of Container objects by host.
|
||||
|
||||
:param context: Security context.
|
||||
:param host: A compute host.
|
||||
:returns: a list of :class:`Container` object.
|
||||
|
||||
"""
|
||||
db_containers = dbapi.list_containers(context, filters={'host': host})
|
||||
return Container._from_db_object_list(db_containers, cls, context)
|
||||
|
||||
@base.remotable
|
||||
def create(self, context):
|
||||
"""Create a Container record in the DB.
|
||||
|
@ -12,12 +12,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
import functools
|
||||
import six
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_service import periodic_task
|
||||
|
||||
from zun.common import consts
|
||||
from zun.common import context
|
||||
from zun.compute.compute_node_tracker import ComputeNodeTracker
|
||||
from zun.container import driver
|
||||
@ -34,85 +32,23 @@ def set_context(func):
|
||||
return handler
|
||||
|
||||
|
||||
class ContainerStatusSyncPeriodicJob(periodic_task.PeriodicTasks):
|
||||
class ContainerStateSyncPeriodicJob(periodic_task.PeriodicTasks):
|
||||
def __init__(self, conf):
|
||||
self.host = conf.host
|
||||
self.driver = driver.load_container_driver(
|
||||
conf.container_driver)
|
||||
self.previous_state = {}
|
||||
self.node_tracker = ComputeNodeTracker(self.host, self.driver)
|
||||
super(ContainerStatusSyncPeriodicJob, self).__init__(conf)
|
||||
|
||||
def _filter_containers_on_status_and_host(self, containers):
|
||||
statuses = [consts.CREATING]
|
||||
return filter(
|
||||
lambda container: container.status not in statuses and
|
||||
container.host == self.host, containers)
|
||||
|
||||
def _find_changed_containers(self, current_state):
|
||||
new_containers = list(set(current_state) - set(self.previous_state))
|
||||
deleted_containers = list(set(self.previous_state) -
|
||||
set(current_state))
|
||||
changed_containers = [k for k in set(self.previous_state) &
|
||||
set(current_state)
|
||||
if current_state[k] != self.previous_state[k]]
|
||||
return new_containers + changed_containers, deleted_containers
|
||||
super(ContainerStateSyncPeriodicJob, self).__init__(conf)
|
||||
|
||||
@periodic_task.periodic_task(run_immediately=True)
|
||||
@set_context
|
||||
def sync_container_status(self, ctx):
|
||||
LOG.debug('Update container status start')
|
||||
def sync_container_state(self, ctx):
|
||||
LOG.debug('Start syncing container states.')
|
||||
|
||||
current_state = {container['Id']: container['State']
|
||||
for container in self.driver.list()}
|
||||
containers = objects.Container.list(ctx)
|
||||
self.driver.update_containers_states(ctx, containers)
|
||||
|
||||
changed_containers, deleted_containers = self._find_changed_containers(
|
||||
current_state)
|
||||
if not changed_containers and not deleted_containers:
|
||||
LOG.debug('No container status change from previous state')
|
||||
return
|
||||
|
||||
self.previous_state = current_state
|
||||
all_containers = objects.Container.list(ctx)
|
||||
containers = self._filter_containers_on_status_and_host(all_containers)
|
||||
|
||||
db_containers_map = {container.container_id: container
|
||||
for container in containers}
|
||||
|
||||
for container_id in changed_containers:
|
||||
if db_containers_map.get(container_id):
|
||||
old_status = db_containers_map.get(container_id).status
|
||||
try:
|
||||
updated_container = self.driver.show(
|
||||
db_containers_map.get(container_id))
|
||||
if old_status != updated_container.status:
|
||||
updated_container.save(ctx)
|
||||
msg = 'Status of container %s changed from %s to %s'
|
||||
LOG.info(msg % (updated_container.uuid, old_status,
|
||||
updated_container.status))
|
||||
except Exception as e:
|
||||
LOG.exception("Unexpected exception: %s",
|
||||
six.text_type(e))
|
||||
|
||||
for container_id in deleted_containers:
|
||||
if db_containers_map.get(container_id):
|
||||
try:
|
||||
if ((db_containers_map.get(container_id).task_state !=
|
||||
consts.CONTAINER_DELETING or
|
||||
db_containers_map.get(container_id).task_state !=
|
||||
consts.SANDBOX_DELETING)):
|
||||
old_status = db_containers_map.get(container_id).status
|
||||
updated_container = self.driver.show(
|
||||
db_containers_map.get(container_id))
|
||||
updated_container.save(ctx)
|
||||
msg = 'Status of container %s changed from %s to %s'
|
||||
LOG.info(msg % (updated_container.uuid, old_status,
|
||||
updated_container.status))
|
||||
except Exception as e:
|
||||
LOG.exception("Unexpected exception: %s",
|
||||
six.text_type(e))
|
||||
|
||||
LOG.debug('Update container status end')
|
||||
LOG.debug('Complete syncing container states.')
|
||||
|
||||
@periodic_task.periodic_task(run_immediately=True)
|
||||
@set_context
|
||||
@ -121,7 +57,7 @@ class ContainerStatusSyncPeriodicJob(periodic_task.PeriodicTasks):
|
||||
|
||||
|
||||
def setup(conf, tg):
|
||||
pt = ContainerStatusSyncPeriodicJob(conf)
|
||||
pt = ContainerStateSyncPeriodicJob(conf)
|
||||
tg.add_dynamic_timer(
|
||||
pt.run_periodic_tasks,
|
||||
periodic_interval_max=conf.periodic_interval_max,
|
||||
|
@ -20,7 +20,7 @@ from zun.container.docker.driver import NovaDockerDriver
|
||||
from zun.container.docker import utils as docker_utils
|
||||
from zun import objects
|
||||
from zun.tests.unit.container import base
|
||||
from zun.tests.unit.db import utils as db_utils
|
||||
from zun.tests.unit.objects import utils as obj_utils
|
||||
|
||||
LSCPU_ON = """# The following is the parsable format, which can be fed to other
|
||||
# programs. Each different item in every column has an unique ID
|
||||
@ -48,8 +48,8 @@ class TestDockerDriver(base.DriverTestCase):
|
||||
docker_client = dfc_patcher.start()
|
||||
self.dfc_context_manager = docker_client.return_value
|
||||
self.mock_docker = mock.MagicMock()
|
||||
container_dict = db_utils.create_test_container(context=self.context)
|
||||
self.mock_default_container = mock.MagicMock(**container_dict)
|
||||
self.mock_default_container = obj_utils.get_test_container(
|
||||
self.context)
|
||||
self.dfc_context_manager.__enter__.return_value = self.mock_docker
|
||||
self.addCleanup(dfc_patcher.stop)
|
||||
|
||||
@ -143,11 +143,28 @@ class TestDockerDriver(base.DriverTestCase):
|
||||
|
||||
def test_list(self):
|
||||
self.mock_docker.list_containers.return_value = []
|
||||
self.driver.list()
|
||||
self.driver.list(self.context)
|
||||
self.mock_docker.list_containers.assert_called_once_with()
|
||||
|
||||
@mock.patch('zun.objects.container.Container.save')
|
||||
def test_update_containers_states(self, mock_save):
|
||||
mock_container = obj_utils.get_test_container(
|
||||
self.context, status='Running', host='host1')
|
||||
mock_container_2 = obj_utils.get_test_container(
|
||||
self.context, status='Stopped')
|
||||
conf.CONF.set_override('host', 'host2')
|
||||
with mock.patch.object(self.driver, 'list') as mock_list:
|
||||
mock_list.return_value = [mock_container_2]
|
||||
self.assertEqual(mock_container.host, 'host1')
|
||||
self.assertEqual(mock_container.status, 'Running')
|
||||
self.driver.update_containers_states(
|
||||
self.context, [mock_container])
|
||||
self.assertEqual(mock_container.host, 'host2')
|
||||
self.assertEqual(mock_container.status, 'Stopped')
|
||||
|
||||
def test_show_success(self):
|
||||
self.mock_docker.inspect_container = mock.Mock(return_value={})
|
||||
self.mock_docker.inspect_container = mock.Mock(
|
||||
return_value={'State': 'running'})
|
||||
mock_container = mock.MagicMock()
|
||||
self.driver.show(mock_container)
|
||||
self.mock_docker.inspect_container.assert_called_once_with(
|
||||
@ -257,7 +274,8 @@ class TestDockerDriver(base.DriverTestCase):
|
||||
|
||||
def test_kill_successful_signal_is_none(self):
|
||||
self.mock_docker.kill = mock.Mock()
|
||||
self.mock_docker.inspect_container = mock.Mock(return_value={})
|
||||
self.mock_docker.inspect_container = mock.Mock(
|
||||
return_value={'State': 'exited'})
|
||||
mock_container = mock.MagicMock()
|
||||
self.driver.kill(mock_container, signal=None)
|
||||
self.mock_docker.kill.assert_called_once_with(
|
||||
@ -267,7 +285,8 @@ class TestDockerDriver(base.DriverTestCase):
|
||||
|
||||
def test_kill_successful_signal_is_not_none(self):
|
||||
self.mock_docker.kill = mock.Mock()
|
||||
self.mock_docker.inspect_container = mock.Mock(return_value={})
|
||||
self.mock_docker.inspect_container = mock.Mock(
|
||||
return_value={'State': 'exited'})
|
||||
mock_container = mock.MagicMock()
|
||||
self.driver.kill(mock_container, signal='test')
|
||||
self.mock_docker.kill.assert_called_once_with(
|
||||
@ -434,9 +453,8 @@ class TestNovaDockerDriver(base.DriverTestCase):
|
||||
mock_ensure_active.return_value = True
|
||||
mock_find_container_by_server_name.return_value = \
|
||||
'test_container_name_id'
|
||||
db_container = db_utils.create_test_container(context=self.context,
|
||||
mock_container = obj_utils.get_test_container(self.context,
|
||||
host=conf.CONF.host)
|
||||
mock_container = mock.MagicMock(**db_container)
|
||||
result_sandbox_id = self.driver.create_sandbox(self.context,
|
||||
mock_container)
|
||||
mock_get_sandbox_name.assert_called_once_with(mock_container)
|
||||
|
@ -57,6 +57,18 @@ class TestContainerObject(base.DbTestCase):
|
||||
self.assertIsInstance(containers[0], objects.Container)
|
||||
self.assertEqual(self.context, containers[0]._context)
|
||||
|
||||
def test_list_by_host(self):
|
||||
with mock.patch.object(self.dbapi, 'list_containers',
|
||||
autospec=True) as mock_get_list:
|
||||
mock_get_list.return_value = [self.fake_container]
|
||||
containers = objects.Container.list_by_host(self.context,
|
||||
'test_host')
|
||||
mock_get_list.assert_called_once_with(
|
||||
self.context, {'host': 'test_host'}, None, None, None, None)
|
||||
self.assertThat(containers, HasLength(1))
|
||||
self.assertIsInstance(containers[0], objects.Container)
|
||||
self.assertEqual(self.context, containers[0]._context)
|
||||
|
||||
def test_list_with_filters(self):
|
||||
with mock.patch.object(self.dbapi, 'list_containers',
|
||||
autospec=True) as mock_get_list:
|
||||
|
@ -354,7 +354,7 @@ class TestObject(test_base.TestCase, _TestObject):
|
||||
# For more information on object version testing, read
|
||||
# http://docs.openstack.org/developer/zun/objects.html
|
||||
object_data = {
|
||||
'Container': '1.13-cb1ad0651457fcb2659d779cd801b565',
|
||||
'Container': '1.14-dc705ea9cad87fa291b5fd7c4a9d57f1',
|
||||
'Image': '1.0-0b976be24f4f6ee0d526e5c981ce0633',
|
||||
'MyObj': '1.0-34c4b1aadefd177b13f9a2f894cc23cd',
|
||||
'NUMANode': '1.0-cba878b70b2f8b52f1e031b41ac13b4e',
|
||||
|
Loading…
x
Reference in New Issue
Block a user