Merge "Use a task model and dumb workers"

This commit is contained in:
Jenkins 2016-06-03 05:26:01 +00:00 committed by Gerrit Code Review
commit 98268db8bb
3 changed files with 214 additions and 89 deletions

View File

@ -48,6 +48,7 @@ if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT) sys.path.insert(0, PROJECT_ROOT)
from kolla.common import config as common_config from kolla.common import config as common_config
from kolla.common import task
from kolla import version from kolla import version
logging.basicConfig() logging.basicConfig()
@ -89,33 +90,56 @@ def docker_client():
sys.exit(1) sys.exit(1)
class PushThread(threading.Thread): class PushIntoQueueTask(task.Task):
"""Task that pushes some other task into a queue."""
def __init__(self, conf, queue): def __init__(self, push_task, push_queue):
super(PushThread, self).__init__() super(PushIntoQueueTask, self).__init__()
self.setDaemon(True) self.push_task = push_task
self.conf = conf self.push_queue = push_queue
self.queue = queue
self.dc = docker_client() @property
def name(self):
return 'PushIntoQueueTask(%s=>%s)' % (self.push_task.name,
self.push_queue)
def run(self): def run(self):
while True: self.push_queue.put(self.push_task)
try: self.success = True
image = self.queue.get()
LOG.debug('%s:Try to push the image', image['name'])
self.push_image(image) class PushTask(task.Task):
except requests_exc.ConnectionError: """Task that pushes a image to a docker repository."""
LOG.exception('%s:Make sure Docker is running and that you'
' have the correct privileges to run Docker' def __init__(self, conf, image):
' (root)', image['name']) super(PushTask, self).__init__()
image['status'] = "connection_error" self.dc = docker_client()
except Exception: self.conf = conf
LOG.exception('%s:Unknown error when pushing', image['name']) self.image = image
image['status'] = "push_error"
finally: @property
if "error" not in image['status']: def name(self):
LOG.info('%s:Pushed successfully', image['name']) return 'PushTask(%s)' % self.image['name']
self.queue.task_done()
def run(self):
image = self.image
try:
LOG.debug('%s:Try to push the image', image['name'])
self.push_image(image)
except requests_exc.ConnectionError:
LOG.exception('%s:Make sure Docker is running and that you'
' have the correct privileges to run Docker'
' (root)', image['name'])
image['status'] = "connection_error"
except Exception:
LOG.exception('%s:Unknown error when pushing', image['name'])
image['status'] = "push_error"
finally:
if "error" not in image['status']:
LOG.info('%s:Pushed successfully', image['name'])
self.success = True
else:
self.success = False
def push_image(self, image): def push_image(self, image):
image['push_logs'] = str() image['push_logs'] = str()
@ -133,45 +157,42 @@ class PushThread(threading.Thread):
LOG.error(stream['errorDetail']['message']) LOG.error(stream['errorDetail']['message'])
class WorkerThread(threading.Thread): class BuildTask(task.Task):
"""Task that builds out an image."""
def __init__(self, queue, push_queue, conf): def __init__(self, conf, image, push_queue):
super(BuildTask, self).__init__()
self.conf = conf self.conf = conf
self.queue = queue self.image = image
self.dc = docker_client()
self.push_queue = push_queue self.push_queue = push_queue
self.nocache = not conf.cache or conf.no_cache self.nocache = not conf.cache or conf.no_cache
self.forcerm = not conf.keep self.forcerm = not conf.keep
self.dc = docker_client()
super(WorkerThread, self).__init__()
def end_task(self, image): @property
"""Properly inform the queue we are finished""" def name(self):
# No matter whether the parent failed or not, we still process return 'BuildTask(%s)' % self.image['name']
# the children. We have the code in place to catch a parent in
# an 'error' status
for child in image['children']:
self.queue.put(child)
LOG.debug('%s:Added image to queue', child['name'])
self.queue.task_done()
LOG.debug('%s:Processed', image['name'])
def run(self): def run(self):
"""Executes tasks until the queue is empty""" self.builder(self.image)
while True: if self.image['status'] == 'built':
try: self.success = True
image = self.queue.get()
for _ in six.moves.range(self.conf.retries + 1): @property
self.builder(image) def followups(self):
if image['status'] in ['built', 'unmatched', followups = []
'parent_error']: if self.conf.push and self.success:
break followups.extend([
except requests_exc.ConnectionError: # If we are supposed to push the image into a docker
LOG.exception('Make sure Docker is running and that you' # repository, then make sure we do that...
' have the correct privileges to run Docker' PushIntoQueueTask(
' (root)') PushTask(self.conf, self.image),
image['status'] = "connection_error" self.push_queue),
break ])
self.end_task(image) if self.image['children'] and self.success:
for image in self.image['children']:
followups.append(BuildTask(self.conf, image, self.push_queue))
return followups
def process_source(self, image, source): def process_source(self, image, source):
dest_archive = os.path.join(image['path'], source['name'] + '-archive') dest_archive = os.path.join(image['path'], source['name'] + '-archive')
@ -337,8 +358,50 @@ class WorkerThread(threading.Thread):
image['status'] = "built" image['status'] = "built"
LOG.info('%s:Built', image['name']) LOG.info('%s:Built', image['name'])
if self.conf.push:
self.push_queue.put(image)
class WorkerThread(threading.Thread):
"""Thread that executes tasks until the queue provides a tombstone."""
#: Object to be put on worker queues to get them to die.
tombstone = object()
def __init__(self, conf, queue):
super(WorkerThread, self).__init__()
self.queue = queue
self.conf = conf
def run(self):
while True:
task = self.queue.get()
if task is self.tombstone:
# Ensure any other threads also get the tombstone.
self.queue.put(task)
break
try:
for attempt in six.moves.range(self.conf.retries + 1):
if attempt > 0:
LOG.debug("Attempting to run task %s for the %s time",
task.name, attempt + 1)
else:
LOG.debug("Attempting to run task %s for the first"
" time", task.name)
try:
task.run()
if task.success:
break
except Exception:
LOG.exception('Unhandled error when running %s',
task.name)
# try again...
task.reset()
if task.success:
for next_task in task.followups:
LOG.debug('Added next task %s to queue',
next_task.name)
self.queue.put(next_task)
finally:
self.queue.task_done()
class KollaWorker(object): class KollaWorker(object):
@ -715,7 +778,7 @@ class KollaWorker(object):
parent['children'].append(image) parent['children'].append(image)
image['parent'] = parent image['parent'] = parent
def build_queue(self): def build_queue(self, push_queue):
"""Organizes Queue list """Organizes Queue list
Return a list of Queues that have been organized into a hierarchy Return a list of Queues that have been organized into a hierarchy
@ -729,7 +792,7 @@ class KollaWorker(object):
for image in self.images: for image in self.images:
if image['parent'] is None: if image['parent'] is None:
queue.put(image) queue.put(BuildTask(self.conf, image, push_queue))
LOG.debug('%s:Added image to queue', image['name']) LOG.debug('%s:Added image to queue', image['name'])
return queue return queue
@ -760,9 +823,6 @@ def run_build():
# to work like we want. A different size or hash will still force a rebuild # to work like we want. A different size or hash will still force a rebuild
kolla.set_time() kolla.set_time()
queue = kolla.build_queue()
push_queue = six.moves.queue.Queue()
if conf.save_dependency: if conf.save_dependency:
kolla.save_dependency(conf.save_dependency) kolla.save_dependency(conf.save_dependency)
LOG.info('Docker images dependency is saved in %s', LOG.info('Docker images dependency is saved in %s',
@ -775,14 +835,20 @@ def run_build():
kolla.list_dependencies() kolla.list_dependencies()
return return
push_queue = six.moves.queue.Queue()
queue = kolla.build_queue(push_queue)
workers = []
for x in six.moves.range(conf.threads): for x in six.moves.range(conf.threads):
worker = WorkerThread(queue, push_queue, conf) worker = WorkerThread(conf, queue)
worker.setDaemon(True) worker.setDaemon(True)
worker.start() worker.start()
workers.append(worker)
for x in six.moves.range(conf.push_threads): for x in six.moves.range(conf.push_threads):
push_thread = PushThread(conf, push_queue) worker = WorkerThread(conf, push_queue)
push_thread.start() worker.start()
workers.append(worker)
# sleep until queue is empty # sleep until queue is empty
while queue.unfinished_tasks or push_queue.unfinished_tasks: while queue.unfinished_tasks or push_queue.unfinished_tasks:
@ -791,6 +857,12 @@ def run_build():
kolla.summary() kolla.summary()
kolla.cleanup() kolla.cleanup()
# ensure all threads exited happily
queue.put(WorkerThread.tombstone)
push_queue.put(WorkerThread.tombstone)
for w in workers:
w.join()
return kolla.get_image_statuses() return kolla.get_image_statuses()

42
kolla/common/task.py Normal file
View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Task(object):
def __init__(self):
self.success = False
@abc.abstractproperty
def name(self):
pass
def reset(self):
self.success = False
@property
def followups(self):
return []
@abc.abstractmethod
def run(self):
pass
@staticmethod
def set_status(status):
# TODO(harlowja): remove this.
pass

View File

@ -31,29 +31,36 @@ FAKE_IMAGE = {
} }
class WorkerThreadTest(base.TestCase): class TasksTest(base.TestCase):
def setUp(self): def setUp(self):
super(WorkerThreadTest, self).setUp() super(TasksTest, self).setUp()
self.image = FAKE_IMAGE.copy() self.image = FAKE_IMAGE.copy()
# NOTE(jeffrey4l): use a real, temporary dir # NOTE(jeffrey4l): use a real, temporary dir
self.image['path'] = self.useFixture(fixtures.TempDir()).path self.image['path'] = self.useFixture(fixtures.TempDir()).path
@mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client')
def test_push_image(self, mock_client):
pusher = build.PushTask(self.conf, self.image)
pusher.run()
mock_client().push.assert_called_once_with(
self.image['fullname'], stream=True, insecure_registry=True)
@mock.patch.dict(os.environ, clear=True) @mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_image(self, mock_client): def test_build_image(self, mock_client):
queue = mock.Mock()
push_queue = mock.Mock() push_queue = mock.Mock()
worker = build.WorkerThread(queue, builder = build.BuildTask(self.conf, self.image, push_queue)
push_queue, builder.run()
self.conf)
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=None) buildargs=None)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, clear=True) @mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_image_with_build_arg(self, mock_client): def test_build_image_with_build_arg(self, mock_client):
@ -62,33 +69,35 @@ class WorkerThreadTest(base.TestCase):
'NO_PROXY': '127.0.0.1' 'NO_PROXY': '127.0.0.1'
} }
self.conf.set_override('build_args', build_args) self.conf.set_override('build_args', build_args)
worker = build.WorkerThread(mock.Mock(), push_queue = mock.Mock()
mock.Mock(), builder = build.BuildTask(self.conf, self.image, push_queue)
self.conf) builder.run()
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'},
clear=True) clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_arg_from_env(self, mock_client): def test_build_arg_from_env(self, mock_client):
push_queue = mock.Mock()
build_args = { build_args = {
'http_proxy': 'http://FROM_ENV:8080', 'http_proxy': 'http://FROM_ENV:8080',
} }
worker = build.WorkerThread(mock.Mock(), builder = build.BuildTask(self.conf, self.image, push_queue)
mock.Mock(), builder.run()
self.conf)
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'},
clear=True) clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
@ -97,30 +106,30 @@ class WorkerThreadTest(base.TestCase):
'http_proxy': 'http://localhost:8080', 'http_proxy': 'http://localhost:8080',
} }
self.conf.set_override('build_args', build_args) self.conf.set_override('build_args', build_args)
worker = build.WorkerThread(mock.Mock(),
mock.Mock(), push_queue = mock.Mock()
self.conf) builder = build.BuildTask(self.conf, self.image, push_queue)
worker.builder(self.image) builder.run()
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch('docker.Client') @mock.patch('docker.Client')
@mock.patch('requests.get') @mock.patch('requests.get')
def test_requests_get_timeout(self, mock_get, mock_client): def test_requests_get_timeout(self, mock_get, mock_client):
worker = build.WorkerThread(mock.Mock(),
mock.Mock(),
self.conf)
self.image['source'] = { self.image['source'] = {
'source': 'http://fake/source', 'source': 'http://fake/source',
'type': 'url', 'type': 'url',
'name': 'fake-image-base' 'name': 'fake-image-base'
} }
push_queue = mock.Mock()
builder = build.BuildTask(self.conf, self.image, push_queue)
mock_get.side_effect = requests.exceptions.Timeout mock_get.side_effect = requests.exceptions.Timeout
get_result = worker.process_source(self.image, get_result = builder.process_source(self.image, self.image['source'])
self.image['source'])
self.assertIsNone(get_result) self.assertIsNone(get_result)
self.assertEqual(self.image['status'], 'error') self.assertEqual(self.image['status'], 'error')
@ -128,6 +137,8 @@ class WorkerThreadTest(base.TestCase):
mock_get.assert_called_once_with(self.image['source']['source'], mock_get.assert_called_once_with(self.image['source']['source'],
timeout=120) timeout=120)
self.assertFalse(builder.success)
class KollaWorkerTest(base.TestCase): class KollaWorkerTest(base.TestCase):