Merge "Remove ThreadPool class"

This commit is contained in:
Jenkins 2016-05-11 01:37:40 +00:00 committed by Gerrit Code Review
commit f66898ae00
5 changed files with 28 additions and 409 deletions

View File

@ -145,10 +145,6 @@ class LockTimeout(MessageTimeout):
pass
class ThreadPoolDead(SwiftException):
pass
class RingBuilderError(SwiftException):
pass

View File

@ -47,8 +47,7 @@ import datetime
import eventlet
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \
greenio, event
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
import eventlet.queue
import netifaces
@ -3213,205 +3212,6 @@ def tpool_reraise(func, *args, **kwargs):
return resp
class ThreadPool(object):
"""
Perform blocking operations in background threads.
Call its methods from within greenlets to green-wait for results without
blocking the eventlet reactor (hopefully).
"""
BYTE = 'a'.encode('utf-8')
def __init__(self, nthreads=2):
self.nthreads = nthreads
self._run_queue = stdlib_queue.Queue()
self._result_queue = stdlib_queue.Queue()
self._threads = []
self._alive = True
if nthreads <= 0:
return
# We spawn a greenthread whose job it is to pull results from the
# worker threads via a real Queue and send them to eventlet Events so
# that the calling greenthreads can be awoken.
#
# Since each OS thread has its own collection of greenthreads, it
# doesn't work to have the worker thread send stuff to the event, as
# it then notifies its own thread-local eventlet hub to wake up, which
# doesn't do anything to help out the actual calling greenthread over
# in the main thread.
#
# Thus, each worker sticks its results into a result queue and then
# writes a byte to a pipe, signaling the result-consuming greenlet (in
# the main thread) to wake up and consume results.
#
# This is all stuff that eventlet.tpool does, but that code can't have
# multiple instances instantiated. Since the object server uses one
# pool per disk, we have to reimplement this stuff.
_raw_rpipe, self.wpipe = os.pipe()
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb')
for _junk in range(nthreads):
thr = stdlib_threading.Thread(
target=self._worker,
args=(self._run_queue, self._result_queue))
thr.daemon = True
thr.start()
self._threads.append(thr)
# This is the result-consuming greenthread that runs in the main OS
# thread, as described above.
self._consumer_coro = greenthread.spawn_n(self._consume_results,
self._result_queue)
def _worker(self, work_queue, result_queue):
"""
Pulls an item from the queue and runs it, then puts the result into
the result queue. Repeats forever.
:param work_queue: queue from which to pull work
:param result_queue: queue into which to place results
"""
while True:
item = work_queue.get()
if item is None:
break
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
result_queue.put((ev, True, result))
except BaseException:
result_queue.put((ev, False, sys.exc_info()))
finally:
work_queue.task_done()
os.write(self.wpipe, self.BYTE)
def _consume_results(self, queue):
"""
Runs as a greenthread in the same OS thread as callers of
run_in_thread().
Takes results from the worker OS threads and sends them to the waiting
greenthreads.
"""
while True:
try:
self.rpipe.read(1)
except ValueError:
# can happen at process shutdown when pipe is closed
break
while True:
try:
ev, success, result = queue.get(block=False)
except stdlib_queue.Empty:
break
try:
if success:
ev.send(result)
else:
ev.send_exception(*result)
finally:
queue.task_done()
def run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, it invokes
``func(*args, **kwargs)`` directly, followed by eventlet.sleep() to
ensure the eventlet hub has a chance to execute. It is more likely the
hub will be invoked when queuing operations to an external thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
result = func(*args, **kwargs)
sleep()
return result
ev = event.Event()
self._run_queue.put((ev, func, args, kwargs), block=False)
# blocks this greenlet (and only *this* greenlet) until the real
# thread calls ev.send().
result = ev.wait()
return result
def _run_in_eventlet_tpool(self, func, *args, **kwargs):
"""
Really run something in an external thread, even if we haven't got any
threads of our own.
"""
def inner():
try:
return (True, func(*args, **kwargs))
except (Timeout, BaseException) as err:
return (False, err)
success, result = tpool.execute(inner)
if success:
return result
else:
raise result
def force_run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, uses eventlet.tpool
to run the function. This is in contrast to run_in_thread(), which
will (in that case) simply execute func in the calling thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)
def terminate(self):
"""
Releases the threadpool's resources (OS threads, greenthreads, pipes,
etc.) and renders it unusable.
Don't call run_in_thread() or force_run_in_thread() after calling
terminate().
"""
self._alive = False
if self.nthreads <= 0:
return
for _junk in range(self.nthreads):
self._run_queue.put(None)
for thr in self._threads:
thr.join()
self._threads = []
self.nthreads = 0
greenthread.kill(self._consumer_coro)
self.rpipe.close()
os.close(self.wpipe)
def ismount(path):
"""
Test whether a path is a mount point. This will catch any

View File

@ -55,9 +55,10 @@ from swift.common.constraints import check_mount, check_dir
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
fsync_dir, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
fsync_dir, drop_buffer_cache, lock_path, write_pickle, \
config_true_value, listdir, split_path, ismount, remove_file, \
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
tpool_reraise
from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
@ -536,7 +537,6 @@ class BaseDiskFileManager(object):
conf.get('replication_one_per_device', 'true'))
self.replication_lock_timeout = int(conf.get(
'replication_lock_timeout', 15))
self.threadpools = defaultdict(lambda: ThreadPool(nthreads=0))
self.use_splice = False
self.pipe_size = None
@ -1115,8 +1115,7 @@ class BaseDiskFileManager(object):
device_path = self.construct_dev_path(device)
async_dir = os.path.join(device_path, get_async_dir(policy))
ohash = hash_path(account, container, obj)
self.threadpools[device].run_in_thread(
write_pickle,
write_pickle(
data,
os.path.join(async_dir, ohash[-3:], ohash + '-' +
Timestamp(timestamp).internal),
@ -1139,7 +1138,7 @@ class BaseDiskFileManager(object):
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
return self.diskfile_cls(self, dev_path, self.threadpools[device],
return self.diskfile_cls(self, dev_path,
partition, account, container, obj,
policy=policy, use_splice=self.use_splice,
pipe_size=self.pipe_size, **kwargs)
@ -1215,7 +1214,7 @@ class BaseDiskFileManager(object):
metadata.get('name', ''), 3, 3, True)
except ValueError:
raise DiskFileNotExist()
return self.diskfile_cls(self, dev_path, self.threadpools[device],
return self.diskfile_cls(self, dev_path,
partition, account, container, obj,
policy=policy, **kwargs)
@ -1235,7 +1234,7 @@ class BaseDiskFileManager(object):
partition)
if not os.path.exists(partition_path):
mkdirs(partition_path)
_junk, hashes = self.threadpools[device].force_run_in_thread(
_junk, hashes = tpool_reraise(
self._get_hashes, partition_path, recalculate=suffixes)
return hashes
@ -1368,19 +1367,16 @@ class BaseDiskFileWriter(object):
:param fd: open file descriptor of temporary file to receive data
:param tmppath: full path name of the opened file descriptor
:param bytes_per_sync: number bytes written between sync calls
:param threadpool: internal thread pool to use for disk operations
:param diskfile: the diskfile creating this DiskFileWriter instance
"""
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool,
diskfile):
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile):
# Parameter tracking
self._name = name
self._datadir = datadir
self._fd = fd
self._tmppath = tmppath
self._bytes_per_sync = bytes_per_sync
self._threadpool = threadpool
self._diskfile = diskfile
# Internal attributes
@ -1409,18 +1405,15 @@ class BaseDiskFileWriter(object):
:returns: the total number of bytes written to an object
"""
def _write_entire_chunk(chunk):
while chunk:
written = os.write(self._fd, chunk)
self._upload_size += written
chunk = chunk[written:]
self._threadpool.run_in_thread(_write_entire_chunk, chunk)
while chunk:
written = os.write(self._fd, chunk)
self._upload_size += written
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
diff = self._upload_size - self._last_sync
if diff >= self._bytes_per_sync:
self._threadpool.force_run_in_thread(fdatasync, self._fd)
tpool_reraise(fdatasync, self._fd)
drop_buffer_cache(self._fd, self._last_sync, diff)
self._last_sync = self._upload_size
@ -1477,8 +1470,7 @@ class BaseDiskFileWriter(object):
metadata['name'] = self._name
target_path = join(self._datadir, filename)
self._threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path, cleanup)
tpool_reraise(self._finalize_put, metadata, target_path, cleanup)
def put(self, metadata):
"""
@ -1521,7 +1513,6 @@ class BaseDiskFileReader(object):
:param data_file: on-disk data file name for the object
:param obj_size: verified on-disk size of the object
:param etag: expected metadata etag value for entire file
:param threadpool: thread pool to use for read operations
:param disk_chunk_size: size of reads from disk in bytes
:param keep_cache_size: maximum object size that will be kept in cache
:param device_path: on-disk device path, used when quarantining an obj
@ -1532,7 +1523,7 @@ class BaseDiskFileReader(object):
:param diskfile: the diskfile creating this DiskFileReader instance
:param keep_cache: should resulting reads be kept in the buffer cache
"""
def __init__(self, fp, data_file, obj_size, etag, threadpool,
def __init__(self, fp, data_file, obj_size, etag,
disk_chunk_size, keep_cache_size, device_path, logger,
quarantine_hook, use_splice, pipe_size, diskfile,
keep_cache=False):
@ -1541,7 +1532,6 @@ class BaseDiskFileReader(object):
self._data_file = data_file
self._obj_size = obj_size
self._etag = etag
self._threadpool = threadpool
self._diskfile = diskfile
self._disk_chunk_size = disk_chunk_size
self._device_path = device_path
@ -1580,8 +1570,7 @@ class BaseDiskFileReader(object):
self._started_at_0 = True
self._iter_etag = hashlib.md5()
while True:
chunk = self._threadpool.run_in_thread(
self._fp.read, self._disk_chunk_size)
chunk = self._fp.read(self._disk_chunk_size)
if chunk:
if self._iter_etag:
self._iter_etag.update(chunk)
@ -1634,8 +1623,8 @@ class BaseDiskFileReader(object):
try:
while True:
# Read data from disk to pipe
(bytes_in_pipe, _1, _2) = self._threadpool.run_in_thread(
splice, rfd, None, client_wpipe, None, pipe_size, 0)
(bytes_in_pipe, _1, _2) = splice(
rfd, None, client_wpipe, None, pipe_size, 0)
if bytes_in_pipe == 0:
self._read_to_eof = True
self._drop_cache(rfd, dropped_cache,
@ -1758,9 +1747,8 @@ class BaseDiskFileReader(object):
drop_buffer_cache(fd, offset, length)
def _quarantine(self, msg):
self._quarantined_dir = self._threadpool.run_in_thread(
self.manager.quarantine_renamer, self._device_path,
self._data_file)
self._quarantined_dir = self.manager.quarantine_renamer(
self._device_path, self._data_file)
self._logger.warning("Quarantined object %s: %s" % (
self._data_file, msg))
self._logger.increment('quarantines')
@ -1824,7 +1812,6 @@ class BaseDiskFile(object):
:param mgr: associated DiskFileManager instance
:param device_path: path to the target device or drive
:param threadpool: thread pool to use for blocking operations
:param partition: partition on the device in which the object lives
:param account: account name for the object
:param container: container name for the object
@ -1837,12 +1824,11 @@ class BaseDiskFile(object):
reader_cls = None # must be set by subclasses
writer_cls = None # must be set by subclasses
def __init__(self, mgr, device_path, threadpool, partition,
def __init__(self, mgr, device_path, partition,
account=None, container=None, obj=None, _datadir=None,
policy=None, use_splice=False, pipe_size=None, **kwargs):
self._manager = mgr
self._device_path = device_path
self._threadpool = threadpool or ThreadPool(nthreads=0)
self._logger = mgr.logger
self._disk_chunk_size = mgr.disk_chunk_size
self._bytes_per_sync = mgr.bytes_per_sync
@ -2043,8 +2029,8 @@ class BaseDiskFile(object):
:param msg: reason for quarantining to be included in the exception
:returns: DiskFileQuarantined exception object
"""
self._quarantined_dir = self._threadpool.run_in_thread(
self.manager.quarantine_renamer, self._device_path, data_file)
self._quarantined_dir = self.manager.quarantine_renamer(
self._device_path, data_file)
self._logger.warning("Quarantined object %s: %s" % (
data_file, msg))
self._logger.increment('quarantines')
@ -2333,7 +2319,7 @@ class BaseDiskFile(object):
"""
dr = self.reader_cls(
self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
self._metadata['ETag'], self._disk_chunk_size,
self._manager.keep_cache_size, self._device_path, self._logger,
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache)
@ -2378,7 +2364,6 @@ class BaseDiskFile(object):
raise
dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
bytes_per_sync=self._bytes_per_sync,
threadpool=self._threadpool,
diskfile=self)
yield dfw
finally:
@ -2566,8 +2551,7 @@ class ECDiskFileWriter(BaseDiskFileWriter):
"""
durable_file_path = os.path.join(
self._datadir, timestamp.internal + '.durable')
self._threadpool.force_run_in_thread(
self._finalize_durable, durable_file_path)
tpool_reraise(self._finalize_durable, durable_file_path)
def put(self, metadata):
"""

View File

@ -30,7 +30,6 @@ import mock
import random
import re
import socket
import stat
import sys
import json
import math
@ -43,7 +42,6 @@ from textwrap import dedent
import tempfile
import time
import traceback
import unittest
import fcntl
import shutil
@ -58,7 +56,7 @@ from six.moves.configparser import NoSectionError, NoOptionError
from swift.common.exceptions import Timeout, MessageTimeout, \
ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \
MimeInvalid, ThreadPoolDead
MimeInvalid
from swift.common import utils
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.header_key_dict import HeaderKeyDict
@ -4727,165 +4725,6 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
self.assertEqual(called, [12345])
class TestThreadPool(unittest.TestCase):
def setUp(self):
self.tp = None
def tearDown(self):
if self.tp:
self.tp.terminate()
def _pipe_count(self):
# Counts the number of pipes that this process owns.
fd_dir = "/proc/%d/fd" % os.getpid()
def is_pipe(path):
try:
stat_result = os.stat(path)
return stat.S_ISFIFO(stat_result.st_mode)
except OSError:
return False
return len([fd for fd in os.listdir(fd_dir)
if is_pipe(os.path.join(fd_dir, fd))])
def _thread_id(self):
return threading.current_thread().ident
def _capture_args(self, *args, **kwargs):
return {'args': args, 'kwargs': kwargs}
def _raise_valueerror(self):
return int('fishcakes')
def test_run_in_thread_with_threads(self):
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
self.assertNotEqual(my_id, other_id)
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEqual(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
caught = False
try:
tp.run_in_thread(self._raise_valueerror)
except ValueError:
caught = True
self.assertTrue(caught)
def test_force_run_in_thread_with_threads(self):
# with nthreads > 0, force_run_in_thread looks just like run_in_thread
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
self.assertNotEqual(my_id, other_id)
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEqual(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
self.assertRaises(ValueError, tp.force_run_in_thread,
self._raise_valueerror)
def test_run_in_thread_without_threads(self):
# with zero threads, run_in_thread doesn't actually do so
tp = utils.ThreadPool(0)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
self.assertEqual(my_id, other_id)
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEqual(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
self.assertRaises(ValueError, tp.run_in_thread,
self._raise_valueerror)
def test_force_run_in_thread_without_threads(self):
# with zero threads, force_run_in_thread uses eventlet.tpool
tp = utils.ThreadPool(0)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
self.assertNotEqual(my_id, other_id)
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
self.assertEqual(result, {'args': (1, 2),
'kwargs': {'bert': 'ernie'}})
self.assertRaises(ValueError, tp.force_run_in_thread,
self._raise_valueerror)
def test_preserving_stack_trace_from_thread(self):
def gamma():
return 1 / 0 # ZeroDivisionError
def beta():
return gamma()
def alpha():
return beta()
tp = self.tp = utils.ThreadPool(1)
try:
tp.run_in_thread(alpha)
except ZeroDivisionError:
# NB: format is (filename, line number, function name, text)
tb_func = [elem[2] for elem
in traceback.extract_tb(sys.exc_info()[2])]
else:
self.fail("Expected ZeroDivisionError")
self.assertEqual(tb_func[-1], "gamma")
self.assertEqual(tb_func[-2], "beta")
self.assertEqual(tb_func[-3], "alpha")
# omit the middle; what's important is that the start and end are
# included, not the exact names of helper methods
self.assertEqual(tb_func[1], "run_in_thread")
self.assertEqual(tb_func[0], "test_preserving_stack_trace_from_thread")
def test_terminate(self):
initial_thread_count = threading.activeCount()
initial_pipe_count = self._pipe_count()
tp = utils.ThreadPool(4)
# do some work to ensure any lazy initialization happens
tp.run_in_thread(os.path.join, 'foo', 'bar')
tp.run_in_thread(os.path.join, 'baz', 'quux')
# 4 threads in the ThreadPool, plus one pipe for IPC; this also
# serves as a sanity check that we're actually allocating some
# resources to free later
self.assertEqual(initial_thread_count, threading.activeCount() - 4)
self.assertEqual(initial_pipe_count, self._pipe_count() - 2)
tp.terminate()
self.assertEqual(initial_thread_count, threading.activeCount())
self.assertEqual(initial_pipe_count, self._pipe_count())
def test_cant_run_after_terminate(self):
tp = utils.ThreadPool(0)
tp.terminate()
self.assertRaises(ThreadPoolDead, tp.run_in_thread, lambda: 1)
self.assertRaises(ThreadPoolDead, tp.force_run_in_thread, lambda: 1)
def test_double_terminate_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
tp.terminate()
tp = utils.ThreadPool(1)
tp.terminate()
tp.terminate()
def test_terminate_no_threads_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
class TestAuditLocationGenerator(unittest.TestCase):
def test_drive_tree_access(self):

View File

@ -975,7 +975,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
dfclass.assert_called_once_with(
self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9',
self.df_mgr, '/srv/dev/', '9',
'a', 'c', 'o', policy=POLICIES[0])
hclistdir.assert_called_once_with(
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900',