Add container status sync periodic job
Implements: BP periodic-task This patch implements a periodic job, which sync the container status in DB with correct status in underlying container driver. UT will be added in subsequent patches. Change-Id: I43d8581b0581de5506a4d92f1670c9076a58d25b
This commit is contained in:
parent
07273927ef
commit
3f195afe4e
@ -21,6 +21,7 @@ from oslo_service import service
|
|||||||
from zun.common import rpc
|
from zun.common import rpc
|
||||||
import zun.conf
|
import zun.conf
|
||||||
from zun.objects import base as objects_base
|
from zun.objects import base as objects_base
|
||||||
|
from zun.service import periodic
|
||||||
from zun.servicegroup import zun_service_periodic as servicegroup
|
from zun.servicegroup import zun_service_periodic as servicegroup
|
||||||
|
|
||||||
# NOTE(paulczar):
|
# NOTE(paulczar):
|
||||||
@ -48,6 +49,7 @@ class Service(service.Service):
|
|||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
servicegroup.setup(CONF, self.binary, self.tg)
|
servicegroup.setup(CONF, self.binary, self.tg)
|
||||||
|
periodic.setup(CONF, self.tg)
|
||||||
self._server.start()
|
self._server.start()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -116,7 +116,8 @@ class DockerDriver(driver.ContainerDriver):
|
|||||||
|
|
||||||
def list(self):
|
def list(self):
|
||||||
with docker_utils.docker_client() as docker:
|
with docker_utils.docker_client() as docker:
|
||||||
return docker.list_instances()
|
return [container for container in docker.list_containers()
|
||||||
|
if 'zun-sandbox-' not in container['Names'][0]]
|
||||||
|
|
||||||
def show(self, container):
|
def show(self, container):
|
||||||
with docker_utils.docker_client() as docker:
|
with docker_utils.docker_client() as docker:
|
||||||
|
@ -93,6 +93,9 @@ class DockerHTTPClient(client.Client):
|
|||||||
res.append(info['Config'].get('Hostname'))
|
res.append(info['Config'].get('Hostname'))
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
def list_containers(self):
|
||||||
|
return self.containers(all=True)
|
||||||
|
|
||||||
def pause(self, container):
|
def pause(self, container):
|
||||||
"""Pause a running container."""
|
"""Pause a running container."""
|
||||||
if isinstance(container, objects.Container):
|
if isinstance(container, objects.Container):
|
||||||
|
@ -12,13 +12,111 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
|
import six
|
||||||
|
|
||||||
|
from oslo_log import log
|
||||||
|
from oslo_service import periodic_task
|
||||||
|
|
||||||
from zun.common import context
|
from zun.common import context
|
||||||
|
from zun.common.i18n import _LE
|
||||||
|
from zun.container import driver
|
||||||
|
from zun import objects
|
||||||
|
from zun.objects import fields
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def set_context(func):
|
def set_context(func):
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def handler(self, ctx):
|
def handler(self, ctx):
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context(all_tenants=True)
|
||||||
func(self, ctx)
|
func(self, ctx)
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
|
|
||||||
|
class ContainerStatusSyncPeriodicJob(periodic_task.PeriodicTasks):
|
||||||
|
def __init__(self, conf):
|
||||||
|
self.host = conf.host
|
||||||
|
self.driver = driver.load_container_driver(
|
||||||
|
conf.container_driver)
|
||||||
|
self.previous_state = {}
|
||||||
|
super(ContainerStatusSyncPeriodicJob, self).__init__(conf)
|
||||||
|
|
||||||
|
def _filter_containers_on_status_and_host(self, containers):
|
||||||
|
statuses = [fields.ContainerStatus.CREATING]
|
||||||
|
return filter(
|
||||||
|
lambda container: container.status not in statuses and
|
||||||
|
container.host == self.host, containers)
|
||||||
|
|
||||||
|
def _find_changed_containers(self, current_state):
|
||||||
|
new_containers = list(set(current_state) - set(self.previous_state))
|
||||||
|
deleted_containers = list(set(self.previous_state) -
|
||||||
|
set(current_state))
|
||||||
|
changed_containers = [k for k in set(self.previous_state) &
|
||||||
|
set(current_state)
|
||||||
|
if current_state[k] != self.previous_state[k]]
|
||||||
|
return new_containers + changed_containers, deleted_containers
|
||||||
|
|
||||||
|
@periodic_task.periodic_task(run_immediately=True)
|
||||||
|
@set_context
|
||||||
|
def sync_container_status(self, ctx):
|
||||||
|
LOG.debug('Update container status start')
|
||||||
|
|
||||||
|
current_state = {container['Id']: container['State']
|
||||||
|
for container in self.driver.list()}
|
||||||
|
|
||||||
|
changed_containers, deleted_containers = self._find_changed_containers(
|
||||||
|
current_state)
|
||||||
|
if not changed_containers and not deleted_containers:
|
||||||
|
LOG.debug('No container status change from previous state')
|
||||||
|
return
|
||||||
|
|
||||||
|
self.previous_state = current_state
|
||||||
|
all_containers = objects.Container.list(ctx)
|
||||||
|
containers = self._filter_containers_on_status_and_host(all_containers)
|
||||||
|
|
||||||
|
db_containers_map = {container.container_id: container
|
||||||
|
for container in containers}
|
||||||
|
|
||||||
|
for container_id in changed_containers:
|
||||||
|
if db_containers_map.get(container_id):
|
||||||
|
old_status = db_containers_map.get(container_id).status
|
||||||
|
try:
|
||||||
|
updated_container = self.driver.show(
|
||||||
|
db_containers_map.get(container_id))
|
||||||
|
if old_status != updated_container.status:
|
||||||
|
updated_container.save(ctx)
|
||||||
|
msg = 'Status of container %s changed from %s to %s'
|
||||||
|
LOG.info(msg % (updated_container.uuid, old_status,
|
||||||
|
updated_container.status))
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception(_LE("Unexpected exception: %s"),
|
||||||
|
six.text_type(e))
|
||||||
|
|
||||||
|
for container_id in deleted_containers:
|
||||||
|
if db_containers_map.get(container_id):
|
||||||
|
try:
|
||||||
|
if ((db_containers_map.get(container_id).task_state !=
|
||||||
|
fields.TaskState.CONTAINER_DELETING or
|
||||||
|
db_containers_map.get(container_id).task_state !=
|
||||||
|
fields.TaskState.SANDBOX_DELETING)):
|
||||||
|
old_status = db_containers_map.get(container_id).status
|
||||||
|
updated_container = self.driver.show(
|
||||||
|
db_containers_map.get(container_id))
|
||||||
|
updated_container.save(ctx)
|
||||||
|
msg = 'Status of container %s changed from %s to %s'
|
||||||
|
LOG.info(msg % (updated_container.uuid, old_status,
|
||||||
|
updated_container.status))
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception(_LE("Unexpected exception: %s"),
|
||||||
|
six.text_type(e))
|
||||||
|
|
||||||
|
LOG.debug('Update container status end')
|
||||||
|
|
||||||
|
|
||||||
|
def setup(conf, tg):
|
||||||
|
pt = ContainerStatusSyncPeriodicJob(conf)
|
||||||
|
tg.add_dynamic_timer(
|
||||||
|
pt.run_periodic_tasks,
|
||||||
|
periodic_interval_max=conf.periodic_interval_max,
|
||||||
|
context=None)
|
||||||
|
@ -142,9 +142,9 @@ class TestDockerDriver(base.DriverTestCase):
|
|||||||
self.assertEqual(1, mock_init.call_count)
|
self.assertEqual(1, mock_init.call_count)
|
||||||
|
|
||||||
def test_list(self):
|
def test_list(self):
|
||||||
self.mock_docker.list_instances = mock.Mock()
|
self.mock_docker.list_containers.return_value = []
|
||||||
self.driver.list()
|
self.driver.list()
|
||||||
self.mock_docker.list_instances.assert_called_once_with()
|
self.mock_docker.list_containers.assert_called_once_with()
|
||||||
|
|
||||||
def test_show_success(self):
|
def test_show_success(self):
|
||||||
self.mock_docker.inspect_container = mock.Mock(return_value={})
|
self.mock_docker.inspect_container = mock.Mock(return_value={})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user