Make container creation aysnc

Creating a container might take a while, which cause a RPC timeout
error. Therefore, let's make it aysnc.

Change-Id: I681ebce93868d90769020802b0eb8eeea11be083
This commit is contained in:
Hongbin Lu 2016-08-14 22:22:29 -05:00
parent b0d5dfce6c
commit ea11c8e37c
12 changed files with 146 additions and 33 deletions

View File

@ -85,6 +85,13 @@ class Container(base.APIBase):
'max_length': 255,
},
},
'task_state': {
'validate': types.String.validate,
'validate_args': {
'min_length': 0,
'max_length': 255,
},
},
'memory': {
'validate': types.String.validate,
'validate_args': {
@ -103,9 +110,9 @@ class Container(base.APIBase):
@staticmethod
def _convert_with_links(container, url, expand=True):
if not expand:
container.unset_fields_except(['uuid', 'name',
'image', 'command', 'status',
'memory', 'environment'])
container.unset_fields_except([
'uuid', 'name', 'image', 'command', 'status', 'memory',
'environment', 'task_state'])
container.links = [link.Link.make_link(
'self', url,
@ -385,16 +392,16 @@ class ContainersController(object):
container_dict = Container(**container_dict).as_dict()
container_dict['project_id'] = context.project_id
container_dict['user_id'] = context.user_id
container_dict['status'] = fields.ContainerStatus.CREATING
new_container = objects.Container(context, **container_dict)
new_container.create()
res_container = pecan.request.rpcapi.container_create(context,
new_container)
pecan.request.rpcapi.container_create(context, new_container)
# Set the HTTP Location Header
pecan.response.location = link.build_url('containers',
res_container.uuid)
pecan.response.status = 201
return Container.convert_with_links(res_container)
new_container.uuid)
pecan.response.status = 202
return Container.convert_with_links(new_container)
class ContainerController(object):

View File

@ -15,9 +15,12 @@
# recommendations from http://docs.openstack.org/developer/oslo.i18n/usage.html
"""Utilities and helper functions."""
import eventlet
import functools
import mimetypes
import uuid
from oslo_context import context as common_context
from oslo_log import log as logging
import pecan
import six
@ -67,3 +70,26 @@ def allow_all_content_types(f):
def generate_uuid():
return str(uuid.uuid4())
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)

View File

@ -34,7 +34,7 @@ class API(rpc_service.API):
transport, context, topic=cfg.CONF.compute.topic)
def container_create(self, context, container):
return self._call('container_create', container=container)
return self._cast('container_create', container=container)
def container_delete(self, context, container):
return self._call('container_delete', container=container)

View File

@ -16,7 +16,9 @@ from oslo_log import log as logging
from zun.common import exception
from zun.common.i18n import _LE
from zun.common import utils
from zun.container import driver
from zun.objects import fields
LOG = logging.getLogger(__name__)
@ -30,16 +32,35 @@ class Manager(object):
self.driver = driver.load_container_driver(container_driver)
def container_create(self, context, container):
utils.spawn_n(self._do_container_create, context, container)
def _do_container_create(self, context, container):
LOG.debug('Creating container...', context=context,
container=container)
container.task_state = fields.TaskState.IMAGE_PULLING
container.save()
try:
self.driver.pull_image(container.image)
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s"), str(e))
container.status = fields.ContainerStatus.ERROR
container.task_state = None
container.save()
return
container.task_state = fields.TaskState.CONTAINER_CREATING
container.save()
try:
container = self.driver.create(container)
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
container.status = fields.ContainerStatus.ERROR
finally:
container.task_state = None
container.save()
def container_delete(self, context, container):
LOG.debug('Deleting container...', context=context,
@ -48,7 +69,7 @@ class Manager(object):
self.driver.delete(container)
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -58,7 +79,7 @@ class Manager(object):
try:
return self.driver.list()
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -71,7 +92,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -84,7 +105,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -97,7 +118,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -110,7 +131,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -123,7 +144,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -136,7 +157,7 @@ class Manager(object):
container.save()
return container
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -147,7 +168,7 @@ class Manager(object):
try:
return self.driver.show_logs(container)
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e
@ -159,7 +180,7 @@ class Manager(object):
try:
return self.driver.execute(container, command)
except Exception as e:
LOG.exception(_LE("Unexpected exception: %s,"), str(e))
LOG.exception(_LE("Unexpected exception: %s"), str(e))
if not isinstance(e, exception.ZunException):
e = exception.ZunException("Unexpected Error: %s" % str(e))
raise e

View File

@ -33,6 +33,12 @@ class DockerDriver(driver.ContainerDriver):
def __init__(self):
super(DockerDriver, self).__init__()
def pull_image(self, image):
with docker_utils.docker_client() as docker:
LOG.debug('Pulling image %s' % image)
image_repo, image_tag = docker_utils.parse_docker_image(image)
docker.pull(image_repo, tag=image_tag)
def create(self, container):
with docker_utils.docker_client() as docker:
name = container.name
@ -40,8 +46,6 @@ class DockerDriver(driver.ContainerDriver):
LOG.debug('Creating container with image %s name %s'
% (image, name))
try:
image_repo, image_tag = docker_utils.parse_docker_image(image)
docker.pull(image_repo, tag=image_tag)
kwargs = {'name': name,
'hostname': container.uuid,
'command': container.command,

View File

@ -54,7 +54,7 @@ def parse_docker_image(image):
image_parts = image.split(':', 1)
image_repo = image_parts[0]
image_tag = None
image_tag = 'latest'
if len(image_parts) > 1:
image_tag = image_parts[1]

View File

@ -0,0 +1,35 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add task state to container
Revision ID: 63a08e32cc43
Revises: 93fbb05b77b9
Create Date: 2016-08-14 20:10:04.038358
"""
# revision identifiers, used by Alembic.
revision = '63a08e32cc43'
down_revision = '93fbb05b77b9'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('container',
sa.Column('task_state', sa.String(length=20),
nullable=True))

View File

@ -131,4 +131,5 @@ class Container(Base):
command = Column(String(255))
memory = Column(String(255))
status = Column(String(20))
task_state = Column(String(20))
environment = Column(JSONEncodedDict)

View File

@ -23,7 +23,8 @@ class Container(base.ZunPersistentObject, base.ZunObject,
# Version 1.0: Initial version
# Version 1.1: Add container_id column
# Version 1.2: Add memory column
VERSION = '1.2'
# Version 1.3: Add task_state column
VERSION = '1.3'
dbapi = dbapi.get_instance()
@ -38,6 +39,7 @@ class Container(base.ZunPersistentObject, base.ZunObject,
'memory': fields.StringField(nullable=True),
'command': fields.StringField(nullable=True),
'status': z_fields.ContainerStatusField(nullable=True),
'task_state': z_fields.TaskStateField(nullable=True),
'environment': fields.DictOfStringsField(nullable=True),
}

View File

@ -15,9 +15,9 @@ from oslo_versionedobjects import fields
class ContainerStatus(fields.Enum):
ALL = (
ERROR, RUNNING, STOPPED, PAUSED, UNKNOWN,
ERROR, RUNNING, STOPPED, PAUSED, UNKNOWN, CREATING,
) = (
'Error', 'Running', 'Stopped', 'Paused', 'Unknown',
'Error', 'Running', 'Stopped', 'Paused', 'Unknown', 'Creating',
)
def __init__(self):
@ -27,3 +27,19 @@ class ContainerStatus(fields.Enum):
class ContainerStatusField(fields.BaseEnumField):
AUTO_TYPE = ContainerStatus()
class TaskState(fields.Enum):
ALL = (
IMAGE_PULLING, CONTAINER_CREATING,
) = (
'image_pulling', 'container_creating',
)
def __init__(self):
super(TaskState, self).__init__(
valid_values=TaskState.ALL)
class TaskStateField(fields.BaseEnumField):
AUTO_TYPE = TaskState()

View File

@ -34,7 +34,7 @@ class TestContainerController(api_base.FunctionalTest):
params=params,
content_type='application/json')
self.assertEqual(201, response.status_int)
self.assertEqual(202, response.status_int)
self.assertTrue(mock_container_create.called)
@patch('zun.compute.api.API.container_create')
@ -68,7 +68,7 @@ class TestContainerController(api_base.FunctionalTest):
response = self.app.post('/v1/containers/',
params=params,
content_type='application/json')
self.assertEqual(201, response.status_int)
self.assertEqual(202, response.status_int)
# get all containers
container = objects.Container.list(self.context)[0]
container.status = 'Stopped'
@ -107,7 +107,7 @@ class TestContainerController(api_base.FunctionalTest):
response = self.app.post('/v1/containers/',
params=params,
content_type='application/json')
self.assertEqual(201, response.status_int)
self.assertEqual(202, response.status_int)
# get all containers
container = objects.Container.list(self.context)[0]
container.status = 'Stopped'
@ -136,7 +136,7 @@ class TestContainerController(api_base.FunctionalTest):
response = self.app.post('/v1/containers/',
params=params,
content_type='application/json')
self.assertEqual(201, response.status_int)
self.assertEqual(202, response.status_int)
# get all containers
container = objects.Container.list(self.context)[0]
container.status = 'Stopped'
@ -164,7 +164,7 @@ class TestContainerController(api_base.FunctionalTest):
response = self.app.post('/v1/containers/',
params=params,
content_type='application/json')
self.assertEqual(201, response.status_int)
self.assertEqual(202, response.status_int)
# get all containers
container = objects.Container.list(self.context)[0]
container.status = 'Stopped'

View File

@ -28,6 +28,7 @@ def get_test_container(**kw):
'updated_at': kw.get('updated_at'),
'command': kw.get('command', 'fake_command'),
'status': kw.get('state', 'Running'),
'task_state': kw.get('task_state', 'container_creating'),
'environment': kw.get('environment', {'key1': 'val1', 'key2': 'val2'}),
'memory': kw.get('memory', '512m'),
}