Use a task model and dumb workers

Instead of having smart workers that need to know
about the full logic of how they operate and what
they will do which makes it hard to reuse parts of
those workers switch the model so that workers are
dumb and they just run tasks which themselves can
request additional tasks to run when succesful.

This makes it more consistent when tasks are to
be retried and also makes it easier to know which
task is running and when.

Less specialized workers == good.

Closes-Bug: #1586474

Change-Id: Ie7b7303b8bf2a80b3e26b6d4ffc27787f8c794d6
This commit is contained in:
Joshua Harlow 2016-05-25 18:53:11 -07:00
parent 23d566a5e3
commit e78ae9bc61
3 changed files with 214 additions and 89 deletions

View File

@ -48,6 +48,7 @@ if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT) sys.path.insert(0, PROJECT_ROOT)
from kolla.common import config as common_config from kolla.common import config as common_config
from kolla.common import task
from kolla import version from kolla import version
logging.basicConfig() logging.basicConfig()
@ -89,19 +90,40 @@ def docker_client():
sys.exit(1) sys.exit(1)
class PushThread(threading.Thread): class PushIntoQueueTask(task.Task):
"""Task that pushes some other task into a queue."""
def __init__(self, conf, queue): def __init__(self, push_task, push_queue):
super(PushThread, self).__init__() super(PushIntoQueueTask, self).__init__()
self.setDaemon(True) self.push_task = push_task
self.conf = conf self.push_queue = push_queue
self.queue = queue
self.dc = docker_client() @property
def name(self):
return 'PushIntoQueueTask(%s=>%s)' % (self.push_task.name,
self.push_queue)
def run(self): def run(self):
while True: self.push_queue.put(self.push_task)
self.success = True
class PushTask(task.Task):
"""Task that pushes a image to a docker repository."""
def __init__(self, conf, image):
super(PushTask, self).__init__()
self.dc = docker_client()
self.conf = conf
self.image = image
@property
def name(self):
return 'PushTask(%s)' % self.image['name']
def run(self):
image = self.image
try: try:
image = self.queue.get()
LOG.debug('%s:Try to push the image', image['name']) LOG.debug('%s:Try to push the image', image['name'])
self.push_image(image) self.push_image(image)
except requests_exc.ConnectionError: except requests_exc.ConnectionError:
@ -115,7 +137,9 @@ class PushThread(threading.Thread):
finally: finally:
if "error" not in image['status']: if "error" not in image['status']:
LOG.info('%s:Pushed successfully', image['name']) LOG.info('%s:Pushed successfully', image['name'])
self.queue.task_done() self.success = True
else:
self.success = False
def push_image(self, image): def push_image(self, image):
image['push_logs'] = str() image['push_logs'] = str()
@ -133,45 +157,42 @@ class PushThread(threading.Thread):
LOG.error(stream['errorDetail']['message']) LOG.error(stream['errorDetail']['message'])
class WorkerThread(threading.Thread): class BuildTask(task.Task):
"""Task that builds out an image."""
def __init__(self, queue, push_queue, conf): def __init__(self, conf, image, push_queue):
super(BuildTask, self).__init__()
self.conf = conf self.conf = conf
self.queue = queue self.image = image
self.dc = docker_client()
self.push_queue = push_queue self.push_queue = push_queue
self.nocache = not conf.cache or conf.no_cache self.nocache = not conf.cache or conf.no_cache
self.forcerm = not conf.keep self.forcerm = not conf.keep
self.dc = docker_client()
super(WorkerThread, self).__init__()
def end_task(self, image): @property
"""Properly inform the queue we are finished""" def name(self):
# No matter whether the parent failed or not, we still process return 'BuildTask(%s)' % self.image['name']
# the children. We have the code in place to catch a parent in
# an 'error' status
for child in image['children']:
self.queue.put(child)
LOG.debug('%s:Added image to queue', child['name'])
self.queue.task_done()
LOG.debug('%s:Processed', image['name'])
def run(self): def run(self):
"""Executes tasks until the queue is empty""" self.builder(self.image)
while True: if self.image['status'] == 'built':
try: self.success = True
image = self.queue.get()
for _ in six.moves.range(self.conf.retries + 1): @property
self.builder(image) def followups(self):
if image['status'] in ['built', 'unmatched', followups = []
'parent_error']: if self.conf.push and self.success:
break followups.extend([
except requests_exc.ConnectionError: # If we are supposed to push the image into a docker
LOG.exception('Make sure Docker is running and that you' # repository, then make sure we do that...
' have the correct privileges to run Docker' PushIntoQueueTask(
' (root)') PushTask(self.conf, self.image),
image['status'] = "connection_error" self.push_queue),
break ])
self.end_task(image) if self.image['children'] and self.success:
for image in self.image['children']:
followups.append(BuildTask(self.conf, image, self.push_queue))
return followups
def process_source(self, image, source): def process_source(self, image, source):
dest_archive = os.path.join(image['path'], source['name'] + '-archive') dest_archive = os.path.join(image['path'], source['name'] + '-archive')
@ -337,8 +358,50 @@ class WorkerThread(threading.Thread):
image['status'] = "built" image['status'] = "built"
LOG.info('%s:Built', image['name']) LOG.info('%s:Built', image['name'])
if self.conf.push:
self.push_queue.put(image)
class WorkerThread(threading.Thread):
"""Thread that executes tasks until the queue provides a tombstone."""
#: Object to be put on worker queues to get them to die.
tombstone = object()
def __init__(self, conf, queue):
super(WorkerThread, self).__init__()
self.queue = queue
self.conf = conf
def run(self):
while True:
task = self.queue.get()
if task is self.tombstone:
# Ensure any other threads also get the tombstone.
self.queue.put(task)
break
try:
for attempt in six.moves.range(self.conf.retries + 1):
if attempt > 0:
LOG.debug("Attempting to run task %s for the %s time",
task.name, attempt + 1)
else:
LOG.debug("Attempting to run task %s for the first"
" time", task.name)
try:
task.run()
if task.success:
break
except Exception:
LOG.exception('Unhandled error when running %s',
task.name)
# try again...
task.reset()
if task.success:
for next_task in task.followups:
LOG.debug('Added next task %s to queue',
next_task.name)
self.queue.put(next_task)
finally:
self.queue.task_done()
class KollaWorker(object): class KollaWorker(object):
@ -715,7 +778,7 @@ class KollaWorker(object):
parent['children'].append(image) parent['children'].append(image)
image['parent'] = parent image['parent'] = parent
def build_queue(self): def build_queue(self, push_queue):
"""Organizes Queue list """Organizes Queue list
Return a list of Queues that have been organized into a hierarchy Return a list of Queues that have been organized into a hierarchy
@ -729,7 +792,7 @@ class KollaWorker(object):
for image in self.images: for image in self.images:
if image['parent'] is None: if image['parent'] is None:
queue.put(image) queue.put(BuildTask(self.conf, image, push_queue))
LOG.debug('%s:Added image to queue', image['name']) LOG.debug('%s:Added image to queue', image['name'])
return queue return queue
@ -760,9 +823,6 @@ def run_build():
# to work like we want. A different size or hash will still force a rebuild # to work like we want. A different size or hash will still force a rebuild
kolla.set_time() kolla.set_time()
queue = kolla.build_queue()
push_queue = six.moves.queue.Queue()
if conf.save_dependency: if conf.save_dependency:
kolla.save_dependency(conf.save_dependency) kolla.save_dependency(conf.save_dependency)
LOG.info('Docker images dependency is saved in %s', LOG.info('Docker images dependency is saved in %s',
@ -775,14 +835,20 @@ def run_build():
kolla.list_dependencies() kolla.list_dependencies()
return return
push_queue = six.moves.queue.Queue()
queue = kolla.build_queue(push_queue)
workers = []
for x in six.moves.range(conf.threads): for x in six.moves.range(conf.threads):
worker = WorkerThread(queue, push_queue, conf) worker = WorkerThread(conf, queue)
worker.setDaemon(True) worker.setDaemon(True)
worker.start() worker.start()
workers.append(worker)
for x in six.moves.range(conf.push_threads): for x in six.moves.range(conf.push_threads):
push_thread = PushThread(conf, push_queue) worker = WorkerThread(conf, push_queue)
push_thread.start() worker.start()
workers.append(worker)
# sleep until queue is empty # sleep until queue is empty
while queue.unfinished_tasks or push_queue.unfinished_tasks: while queue.unfinished_tasks or push_queue.unfinished_tasks:
@ -791,6 +857,12 @@ def run_build():
kolla.summary() kolla.summary()
kolla.cleanup() kolla.cleanup()
# ensure all threads exited happily
queue.put(WorkerThread.tombstone)
push_queue.put(WorkerThread.tombstone)
for w in workers:
w.join()
return kolla.get_image_statuses() return kolla.get_image_statuses()

42
kolla/common/task.py Normal file
View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Task(object):
def __init__(self):
self.success = False
@abc.abstractproperty
def name(self):
pass
def reset(self):
self.success = False
@property
def followups(self):
return []
@abc.abstractmethod
def run(self):
pass
@staticmethod
def set_status(status):
# TODO(harlowja): remove this.
pass

View File

@ -31,29 +31,36 @@ FAKE_IMAGE = {
} }
class WorkerThreadTest(base.TestCase): class TasksTest(base.TestCase):
def setUp(self): def setUp(self):
super(WorkerThreadTest, self).setUp() super(TasksTest, self).setUp()
self.image = FAKE_IMAGE.copy() self.image = FAKE_IMAGE.copy()
# NOTE(jeffrey4l): use a real, temporary dir # NOTE(jeffrey4l): use a real, temporary dir
self.image['path'] = self.useFixture(fixtures.TempDir()).path self.image['path'] = self.useFixture(fixtures.TempDir()).path
@mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client')
def test_push_image(self, mock_client):
pusher = build.PushTask(self.conf, self.image)
pusher.run()
mock_client().push.assert_called_once_with(
self.image['fullname'], stream=True, insecure_registry=True)
@mock.patch.dict(os.environ, clear=True) @mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_image(self, mock_client): def test_build_image(self, mock_client):
queue = mock.Mock()
push_queue = mock.Mock() push_queue = mock.Mock()
worker = build.WorkerThread(queue, builder = build.BuildTask(self.conf, self.image, push_queue)
push_queue, builder.run()
self.conf)
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=None) buildargs=None)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, clear=True) @mock.patch.dict(os.environ, clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_image_with_build_arg(self, mock_client): def test_build_image_with_build_arg(self, mock_client):
@ -62,33 +69,35 @@ class WorkerThreadTest(base.TestCase):
'NO_PROXY': '127.0.0.1' 'NO_PROXY': '127.0.0.1'
} }
self.conf.set_override('build_args', build_args) self.conf.set_override('build_args', build_args)
worker = build.WorkerThread(mock.Mock(), push_queue = mock.Mock()
mock.Mock(), builder = build.BuildTask(self.conf, self.image, push_queue)
self.conf) builder.run()
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'},
clear=True) clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
def test_build_arg_from_env(self, mock_client): def test_build_arg_from_env(self, mock_client):
push_queue = mock.Mock()
build_args = { build_args = {
'http_proxy': 'http://FROM_ENV:8080', 'http_proxy': 'http://FROM_ENV:8080',
} }
worker = build.WorkerThread(mock.Mock(), builder = build.BuildTask(self.conf, self.image, push_queue)
mock.Mock(), builder.run()
self.conf)
worker.builder(self.image)
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'},
clear=True) clear=True)
@mock.patch('docker.Client') @mock.patch('docker.Client')
@ -97,30 +106,30 @@ class WorkerThreadTest(base.TestCase):
'http_proxy': 'http://localhost:8080', 'http_proxy': 'http://localhost:8080',
} }
self.conf.set_override('build_args', build_args) self.conf.set_override('build_args', build_args)
worker = build.WorkerThread(mock.Mock(),
mock.Mock(), push_queue = mock.Mock()
self.conf) builder = build.BuildTask(self.conf, self.image, push_queue)
worker.builder(self.image) builder.run()
mock_client().build.assert_called_once_with( mock_client().build.assert_called_once_with(
path=self.image['path'], tag=self.image['fullname'], path=self.image['path'], tag=self.image['fullname'],
nocache=False, rm=True, pull=True, forcerm=True, nocache=False, rm=True, pull=True, forcerm=True,
buildargs=build_args) buildargs=build_args)
self.assertTrue(builder.success)
@mock.patch('docker.Client') @mock.patch('docker.Client')
@mock.patch('requests.get') @mock.patch('requests.get')
def test_requests_get_timeout(self, mock_get, mock_client): def test_requests_get_timeout(self, mock_get, mock_client):
worker = build.WorkerThread(mock.Mock(),
mock.Mock(),
self.conf)
self.image['source'] = { self.image['source'] = {
'source': 'http://fake/source', 'source': 'http://fake/source',
'type': 'url', 'type': 'url',
'name': 'fake-image-base' 'name': 'fake-image-base'
} }
push_queue = mock.Mock()
builder = build.BuildTask(self.conf, self.image, push_queue)
mock_get.side_effect = requests.exceptions.Timeout mock_get.side_effect = requests.exceptions.Timeout
get_result = worker.process_source(self.image, get_result = builder.process_source(self.image, self.image['source'])
self.image['source'])
self.assertIsNone(get_result) self.assertIsNone(get_result)
self.assertEqual(self.image['status'], 'error') self.assertEqual(self.image['status'], 'error')
@ -128,6 +137,8 @@ class WorkerThreadTest(base.TestCase):
mock_get.assert_called_once_with(self.image['source']['source'], mock_get.assert_called_once_with(self.image['source']['source'],
timeout=120) timeout=120)
self.assertFalse(builder.success)
class KollaWorkerTest(base.TestCase): class KollaWorkerTest(base.TestCase):