Use threadpools in the object server for performance.
Without a (per-disk) threadpool, requests to a slow disk would affect all clients by blocking the entire eventlet reactor on read/write/etc. The slower the disk, the worse the performance. On an object server, you frequently have at least one slow disk due to auditing and replication activity sucking up all the available IO. By kicking those blocking calls out to a separate OS thread, we let the eventlet reactor make progress in other greenthreads, and by having a per-disk pool, we ensure that one slow disk can't suck up all the resources of an entire object server. There were a few blocking calls that were done with eventlet.tpool, but that's a fixed-size global threadpool, so I moved them to the per-disk threadpools. If the object server is configured not to use per-disk threadpools, (i.e. threads_per_disk = 0, which is the default), those call sites will still ultimately end up using eventlet.tpool.execute. You won't end up blocking a whole object server while waiting for a huge fsync. If you decide not to use threadpools, the only extra overhead should be a few extra Python function calls here and there. This is accomplished by setting threads_per_disk = 0 in the config. blueprint concurrent-disk-io Change-Id: I490f8753d926fdcee3a0c65c5aaf715bc2b7c290
This commit is contained in:
parent
63a5a93734
commit
b491549ac2
@ -378,6 +378,13 @@ mb_per_sync 512 On PUT requests, sync file every n MB
|
||||
keep_cache_size 5242880 Largest object size to keep in buffer cache
|
||||
keep_cache_private false Allow non-public objects to stay in
|
||||
kernel's buffer cache
|
||||
threads_per_disk 0 Size of the per-disk thread pool used for
|
||||
performing disk I/O. The default of 0 means
|
||||
to not use a per-disk thread pool. It is
|
||||
recommended to keep this value small, as
|
||||
large values can result in high read
|
||||
latencies due to large queue depths. A good
|
||||
starting point is 4 threads per disk.
|
||||
================== ============= ===========================================
|
||||
|
||||
[object-replicator]
|
||||
|
@ -70,6 +70,8 @@ use = egg:swift#object
|
||||
# verbs, set to "False". Unless you have a separate replication network, you
|
||||
# should not specify any value for "replication_server".
|
||||
# replication_server = False
|
||||
# A value of 0 means "don't use thread pools". A reasonable starting point is 4.
|
||||
# threads_per_disk = 0
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -20,6 +20,7 @@ import fcntl
|
||||
import os
|
||||
import pwd
|
||||
import sys
|
||||
import threading as stdlib_threading
|
||||
import time
|
||||
import uuid
|
||||
import functools
|
||||
@ -32,6 +33,7 @@ import ctypes.util
|
||||
from ConfigParser import ConfigParser, NoSectionError, NoOptionError, \
|
||||
RawConfigParser
|
||||
from optparse import OptionParser
|
||||
from Queue import Queue, Empty
|
||||
from tempfile import mkstemp, NamedTemporaryFile
|
||||
try:
|
||||
import simplejson as json
|
||||
@ -44,7 +46,8 @@ import itertools
|
||||
|
||||
import eventlet
|
||||
import eventlet.semaphore
|
||||
from eventlet import GreenPool, sleep, Timeout, tpool
|
||||
from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \
|
||||
greenio, event
|
||||
from eventlet.green import socket, threading
|
||||
import netifaces
|
||||
import codecs
|
||||
@ -1755,3 +1758,164 @@ def tpool_reraise(func, *args, **kwargs):
|
||||
if isinstance(resp, BaseException):
|
||||
raise resp
|
||||
return resp
|
||||
|
||||
|
||||
class ThreadPool(object):
|
||||
BYTE = 'a'.encode('utf-8')
|
||||
|
||||
"""
|
||||
Perform blocking operations in background threads.
|
||||
|
||||
Call its methods from within greenlets to green-wait for results without
|
||||
blocking the eventlet reactor (hopefully).
|
||||
"""
|
||||
def __init__(self, nthreads=2):
|
||||
self.nthreads = nthreads
|
||||
self._run_queue = Queue()
|
||||
self._result_queue = Queue()
|
||||
self._threads = []
|
||||
|
||||
if nthreads <= 0:
|
||||
return
|
||||
|
||||
# We spawn a greenthread whose job it is to pull results from the
|
||||
# worker threads via a real Queue and send them to eventlet Events so
|
||||
# that the calling greenthreads can be awoken.
|
||||
#
|
||||
# Since each OS thread has its own collection of greenthreads, it
|
||||
# doesn't work to have the worker thread send stuff to the event, as
|
||||
# it then notifies its own thread-local eventlet hub to wake up, which
|
||||
# doesn't do anything to help out the actual calling greenthread over
|
||||
# in the main thread.
|
||||
#
|
||||
# Thus, each worker sticks its results into a result queue and then
|
||||
# writes a byte to a pipe, signaling the result-consuming greenlet (in
|
||||
# the main thread) to wake up and consume results.
|
||||
#
|
||||
# This is all stuff that eventlet.tpool does, but that code can't have
|
||||
# multiple instances instantiated. Since the object server uses one
|
||||
# pool per disk, we have to reimplement this stuff.
|
||||
_raw_rpipe, self.wpipe = os.pipe()
|
||||
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0)
|
||||
|
||||
for _junk in xrange(nthreads):
|
||||
thr = stdlib_threading.Thread(
|
||||
target=self._worker,
|
||||
args=(self._run_queue, self._result_queue))
|
||||
thr.daemon = True
|
||||
thr.start()
|
||||
self._threads.append(thr)
|
||||
|
||||
# This is the result-consuming greenthread that runs in the main OS
|
||||
# thread, as described above.
|
||||
self._consumer_coro = greenthread.spawn_n(self._consume_results,
|
||||
self._result_queue)
|
||||
|
||||
def _worker(self, work_queue, result_queue):
|
||||
"""
|
||||
Pulls an item from the queue and runs it, then puts the result into
|
||||
the result queue. Repeats forever.
|
||||
|
||||
:param work_queue: queue from which to pull work
|
||||
:param result_queue: queue into which to place results
|
||||
"""
|
||||
while True:
|
||||
item = work_queue.get()
|
||||
ev, func, args, kwargs = item
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
result_queue.put((ev, True, result))
|
||||
except BaseException, err:
|
||||
result_queue.put((ev, False, err))
|
||||
finally:
|
||||
work_queue.task_done()
|
||||
os.write(self.wpipe, self.BYTE)
|
||||
|
||||
def _consume_results(self, queue):
|
||||
"""
|
||||
Runs as a greenthread in the same OS thread as callers of
|
||||
run_in_thread().
|
||||
|
||||
Takes results from the worker OS threads and sends them to the waiting
|
||||
greenthreads.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
self.rpipe.read(1)
|
||||
except ValueError:
|
||||
# can happen at process shutdown when pipe is closed
|
||||
break
|
||||
|
||||
while True:
|
||||
try:
|
||||
ev, success, result = queue.get(block=False)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
try:
|
||||
if success:
|
||||
ev.send(result)
|
||||
else:
|
||||
ev.send_exception(result)
|
||||
finally:
|
||||
queue.task_done()
|
||||
|
||||
def run_in_thread(self, func, *args, **kwargs):
|
||||
"""
|
||||
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
|
||||
until results are available.
|
||||
|
||||
Exceptions thrown will be reraised in the calling thread.
|
||||
|
||||
If the threadpool was initialized with nthreads=0, just calls
|
||||
func(*args, **kwargs).
|
||||
|
||||
:returns: result of calling func
|
||||
:raises: whatever func raises
|
||||
"""
|
||||
if self.nthreads <= 0:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
ev = event.Event()
|
||||
self._run_queue.put((ev, func, args, kwargs), block=False)
|
||||
|
||||
# blocks this greenlet (and only *this* greenlet) until the real
|
||||
# thread calls ev.send().
|
||||
result = ev.wait()
|
||||
return result
|
||||
|
||||
def _run_in_eventlet_tpool(self, func, *args, **kwargs):
|
||||
"""
|
||||
Really run something in an external thread, even if we haven't got any
|
||||
threads of our own.
|
||||
"""
|
||||
def inner():
|
||||
try:
|
||||
return (True, func(*args, **kwargs))
|
||||
except (Timeout, BaseException) as err:
|
||||
return (False, err)
|
||||
|
||||
success, result = tpool.execute(inner)
|
||||
if success:
|
||||
return result
|
||||
else:
|
||||
raise result
|
||||
|
||||
def force_run_in_thread(self, func, *args, **kwargs):
|
||||
"""
|
||||
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
|
||||
until results are available.
|
||||
|
||||
Exceptions thrown will be reraised in the calling thread.
|
||||
|
||||
If the threadpool was initialized with nthreads=0, uses eventlet.tpool
|
||||
to run the function. This is in contrast to run_in_thread(), which
|
||||
will (in that case) simply execute func in the calling thread.
|
||||
|
||||
:returns: result of calling func
|
||||
:raises: whatever func raises
|
||||
"""
|
||||
if self.nthreads <= 0:
|
||||
return self._run_in_eventlet_tpool(func, *args, **kwargs)
|
||||
else:
|
||||
return self.run_in_thread(func, *args, **kwargs)
|
||||
|
@ -21,6 +21,7 @@ import errno
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from hashlib import md5
|
||||
from tempfile import mkstemp
|
||||
@ -28,13 +29,13 @@ from urllib import unquote
|
||||
from contextlib import contextmanager
|
||||
|
||||
from xattr import getxattr, setxattr
|
||||
from eventlet import sleep, Timeout, tpool
|
||||
from eventlet import sleep, Timeout
|
||||
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
||||
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
||||
config_true_value, validate_device_partition, timing_stats, \
|
||||
tpool_reraise
|
||||
ThreadPool
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_object_creation, check_mount, \
|
||||
check_float, check_utf8
|
||||
@ -100,12 +101,13 @@ class DiskWriter(object):
|
||||
requests. Serves as the context manager object for DiskFile's writer()
|
||||
method.
|
||||
"""
|
||||
def __init__(self, disk_file, fd, tmppath):
|
||||
def __init__(self, disk_file, fd, tmppath, threadpool):
|
||||
self.disk_file = disk_file
|
||||
self.fd = fd
|
||||
self.tmppath = tmppath
|
||||
self.upload_size = 0
|
||||
self.last_sync = 0
|
||||
self.threadpool = threadpool
|
||||
|
||||
def write(self, chunk):
|
||||
"""
|
||||
@ -113,16 +115,21 @@ class DiskWriter(object):
|
||||
|
||||
:param chunk: the chunk of data to write as a string object
|
||||
"""
|
||||
while chunk:
|
||||
written = os.write(self.fd, chunk)
|
||||
self.upload_size += written
|
||||
chunk = chunk[written:]
|
||||
# For large files sync every 512MB (by default) written
|
||||
diff = self.upload_size - self.last_sync
|
||||
if diff >= self.disk_file.bytes_per_sync:
|
||||
tpool.execute(fdatasync, self.fd)
|
||||
drop_buffer_cache(self.fd, self.last_sync, diff)
|
||||
self.last_sync = self.upload_size
|
||||
|
||||
def _write_entire_chunk(chunk):
|
||||
while chunk:
|
||||
written = os.write(self.fd, chunk)
|
||||
self.upload_size += written
|
||||
chunk = chunk[written:]
|
||||
|
||||
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
|
||||
|
||||
# For large files sync every 512MB (by default) written
|
||||
diff = self.upload_size - self.last_sync
|
||||
if diff >= self.disk_file.bytes_per_sync:
|
||||
self.threadpool.force_run_in_thread(fdatasync, self.fd)
|
||||
drop_buffer_cache(self.fd, self.last_sync, diff)
|
||||
self.last_sync = self.upload_size
|
||||
|
||||
def put(self, metadata, extension='.data'):
|
||||
"""
|
||||
@ -136,22 +143,27 @@ class DiskWriter(object):
|
||||
assert self.tmppath is not None
|
||||
timestamp = normalize_timestamp(metadata['X-Timestamp'])
|
||||
metadata['name'] = self.disk_file.name
|
||||
# Write the metadata before calling fsync() so that both data and
|
||||
# metadata are flushed to disk.
|
||||
write_metadata(self.fd, metadata)
|
||||
# We call fsync() before calling drop_cache() to lower the amount of
|
||||
# redundant work the drop cache code will perform on the pages (now
|
||||
# that after fsync the pages will be all clean).
|
||||
tpool.execute(fsync, self.fd)
|
||||
# From the Department of the Redundancy Department, make sure we
|
||||
# call drop_cache() after fsync() to avoid redundant work (pages
|
||||
# all clean).
|
||||
drop_buffer_cache(self.fd, 0, self.upload_size)
|
||||
invalidate_hash(os.path.dirname(self.disk_file.datadir))
|
||||
# After the rename completes, this object will be available for other
|
||||
# requests to reference.
|
||||
renamer(self.tmppath,
|
||||
os.path.join(self.disk_file.datadir, timestamp + extension))
|
||||
|
||||
def finalize_put():
|
||||
# Write the metadata before calling fsync() so that both data and
|
||||
# metadata are flushed to disk.
|
||||
write_metadata(self.fd, metadata)
|
||||
# We call fsync() before calling drop_cache() to lower the amount
|
||||
# of redundant work the drop cache code will perform on the pages
|
||||
# (now that after fsync the pages will be all clean).
|
||||
fsync(self.fd)
|
||||
# From the Department of the Redundancy Department, make sure
|
||||
# we call drop_cache() after fsync() to avoid redundant work
|
||||
# (pages all clean).
|
||||
drop_buffer_cache(self.fd, 0, self.upload_size)
|
||||
invalidate_hash(os.path.dirname(self.disk_file.datadir))
|
||||
# After the rename completes, this object will be available for
|
||||
# other requests to reference.
|
||||
renamer(self.tmppath,
|
||||
os.path.join(self.disk_file.datadir,
|
||||
timestamp + extension))
|
||||
|
||||
self.threadpool.force_run_in_thread(finalize_put)
|
||||
self.disk_file.metadata = metadata
|
||||
|
||||
|
||||
@ -169,12 +181,15 @@ class DiskFile(object):
|
||||
:param disk_chunk_size: size of chunks on file reads
|
||||
:param bytes_per_sync: number of bytes between fdatasync calls
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param threadpool: thread pool in which to do blocking operations
|
||||
|
||||
:raises DiskFileCollision: on md5 collision
|
||||
"""
|
||||
|
||||
def __init__(self, path, device, partition, account, container, obj,
|
||||
logger, keep_data_fp=False, disk_chunk_size=65536,
|
||||
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None):
|
||||
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
|
||||
threadpool=None):
|
||||
self.disk_chunk_size = disk_chunk_size
|
||||
self.bytes_per_sync = bytes_per_sync
|
||||
self.iter_hook = iter_hook
|
||||
@ -195,6 +210,7 @@ class DiskFile(object):
|
||||
self.quarantined_dir = None
|
||||
self.keep_cache = False
|
||||
self.suppress_file_closing = False
|
||||
self.threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
if not os.path.exists(self.datadir):
|
||||
return
|
||||
files = sorted(os.listdir(self.datadir), reverse=True)
|
||||
@ -240,7 +256,8 @@ class DiskFile(object):
|
||||
self.started_at_0 = True
|
||||
self.iter_etag = md5()
|
||||
while True:
|
||||
chunk = self.fp.read(self.disk_chunk_size)
|
||||
chunk = self.threadpool.run_in_thread(
|
||||
self.fp.read, self.disk_chunk_size)
|
||||
if chunk:
|
||||
if self.iter_etag:
|
||||
self.iter_etag.update(chunk)
|
||||
@ -366,7 +383,7 @@ class DiskFile(object):
|
||||
fallocate(fd, size)
|
||||
except OSError:
|
||||
raise DiskFileNoSpace()
|
||||
yield DiskWriter(self, fd, tmppath)
|
||||
yield DiskWriter(self, fd, tmppath, self.threadpool)
|
||||
finally:
|
||||
try:
|
||||
os.close(fd)
|
||||
@ -396,13 +413,16 @@ class DiskFile(object):
|
||||
:param timestamp: timestamp to compare with each file
|
||||
"""
|
||||
timestamp = normalize_timestamp(timestamp)
|
||||
for fname in os.listdir(self.datadir):
|
||||
if fname < timestamp:
|
||||
try:
|
||||
os.unlink(os.path.join(self.datadir, fname))
|
||||
except OSError, err: # pragma: no cover
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def _unlinkold():
|
||||
for fname in os.listdir(self.datadir):
|
||||
if fname < timestamp:
|
||||
try:
|
||||
os.unlink(os.path.join(self.datadir, fname))
|
||||
except OSError, err: # pragma: no cover
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
self.threadpool.run_in_thread(_unlinkold)
|
||||
|
||||
def _drop_cache(self, fd, offset, length):
|
||||
"""Method for no-oping buffer cache drop method."""
|
||||
@ -418,8 +438,8 @@ class DiskFile(object):
|
||||
directory otherwise None
|
||||
"""
|
||||
if not (self.is_deleted() or self.quarantined_dir):
|
||||
self.quarantined_dir = quarantine_renamer(self.device_path,
|
||||
self.data_file)
|
||||
self.quarantined_dir = self.threadpool.run_in_thread(
|
||||
quarantine_renamer, self.device_path, self.data_file)
|
||||
self.logger.increment('quarantines')
|
||||
return self.quarantined_dir
|
||||
|
||||
@ -436,7 +456,8 @@ class DiskFile(object):
|
||||
try:
|
||||
file_size = 0
|
||||
if self.data_file:
|
||||
file_size = os.path.getsize(self.data_file)
|
||||
file_size = self.threadpool.run_in_thread(
|
||||
os.path.getsize, self.data_file)
|
||||
if 'Content-Length' in self.metadata:
|
||||
metadata_size = int(self.metadata['Content-Length'])
|
||||
if file_size != metadata_size:
|
||||
@ -486,6 +507,9 @@ class ObjectController(object):
|
||||
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
|
||||
self.replication_server = replication_server
|
||||
self.allowed_methods = allowed_methods
|
||||
self.threads_per_disk = int(conf.get('threads_per_disk', '0'))
|
||||
self.threadpools = defaultdict(
|
||||
lambda: ThreadPool(nthreads=self.threads_per_disk))
|
||||
default_allowed_headers = '''
|
||||
content-disposition,
|
||||
content-encoding,
|
||||
@ -547,7 +571,8 @@ class ObjectController(object):
|
||||
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
|
||||
ohash = hash_path(account, container, obj)
|
||||
self.logger.increment('async_pendings')
|
||||
write_pickle(
|
||||
self.threadpools[objdevice].run_in_thread(
|
||||
write_pickle,
|
||||
{'op': op, 'account': account, 'container': container,
|
||||
'obj': obj, 'headers': headers_out},
|
||||
os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
||||
@ -668,7 +693,8 @@ class ObjectController(object):
|
||||
disk_file = DiskFile(self.devices, device, partition, account,
|
||||
container, obj, self.logger,
|
||||
disk_chunk_size=self.disk_chunk_size,
|
||||
bytes_per_sync=self.bytes_per_sync)
|
||||
bytes_per_sync=self.bytes_per_sync,
|
||||
threadpool=self.threadpools[device])
|
||||
if disk_file.is_deleted() or disk_file.is_expired():
|
||||
return HTTPNotFound(request=request)
|
||||
try:
|
||||
@ -726,7 +752,8 @@ class ObjectController(object):
|
||||
disk_file = DiskFile(self.devices, device, partition, account,
|
||||
container, obj, self.logger,
|
||||
disk_chunk_size=self.disk_chunk_size,
|
||||
bytes_per_sync=self.bytes_per_sync)
|
||||
bytes_per_sync=self.bytes_per_sync,
|
||||
threadpool=self.threadpools[device])
|
||||
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
|
||||
orig_timestamp = disk_file.metadata.get('X-Timestamp')
|
||||
upload_expiration = time.time() + self.max_upload_time
|
||||
@ -811,6 +838,7 @@ class ObjectController(object):
|
||||
container, obj, self.logger, keep_data_fp=True,
|
||||
disk_chunk_size=self.disk_chunk_size,
|
||||
bytes_per_sync=self.bytes_per_sync,
|
||||
threadpool=self.threadpools[device],
|
||||
iter_hook=sleep)
|
||||
if disk_file.is_deleted() or disk_file.is_expired():
|
||||
if request.headers.get('if-match') == '*':
|
||||
@ -893,7 +921,8 @@ class ObjectController(object):
|
||||
disk_file = DiskFile(self.devices, device, partition, account,
|
||||
container, obj, self.logger,
|
||||
disk_chunk_size=self.disk_chunk_size,
|
||||
bytes_per_sync=self.bytes_per_sync)
|
||||
bytes_per_sync=self.bytes_per_sync,
|
||||
threadpool=self.threadpools[device])
|
||||
if disk_file.is_deleted() or disk_file.is_expired():
|
||||
return HTTPNotFound(request=request)
|
||||
try:
|
||||
@ -938,7 +967,8 @@ class ObjectController(object):
|
||||
disk_file = DiskFile(self.devices, device, partition, account,
|
||||
container, obj, self.logger,
|
||||
disk_chunk_size=self.disk_chunk_size,
|
||||
bytes_per_sync=self.bytes_per_sync)
|
||||
bytes_per_sync=self.bytes_per_sync,
|
||||
threadpool=self.threadpools[device])
|
||||
if 'x-if-delete-at' in request.headers and \
|
||||
int(request.headers['x-if-delete-at']) != \
|
||||
int(disk_file.metadata.get('X-Delete-At') or 0):
|
||||
@ -986,7 +1016,8 @@ class ObjectController(object):
|
||||
if not os.path.exists(path):
|
||||
mkdirs(path)
|
||||
suffixes = suffix.split('-') if suffix else []
|
||||
_junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes)
|
||||
_junk, hashes = self.threadpools[device].force_run_in_thread(
|
||||
get_hashes, path, recalculate=suffixes)
|
||||
return Response(body=pickle.dumps(hashes))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
|
@ -27,9 +27,9 @@ import re
|
||||
import socket
|
||||
import sys
|
||||
from textwrap import dedent
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
from threading import Thread
|
||||
from Queue import Queue, Empty
|
||||
from getpass import getuser
|
||||
from shutil import rmtree
|
||||
@ -1531,7 +1531,7 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
self.sock.bind(('localhost', 0))
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.queue = Queue()
|
||||
self.reader_thread = Thread(target=self.statsd_reader)
|
||||
self.reader_thread = threading.Thread(target=self.statsd_reader)
|
||||
self.reader_thread.setDaemon(1)
|
||||
self.reader_thread.start()
|
||||
|
||||
@ -1815,5 +1815,91 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
self.assertEquals(called, [12345])
|
||||
|
||||
|
||||
class TestThreadpool(unittest.TestCase):
|
||||
|
||||
def _thread_id(self):
|
||||
return threading.current_thread().ident
|
||||
|
||||
def _capture_args(self, *args, **kwargs):
|
||||
return {'args': args, 'kwargs': kwargs}
|
||||
|
||||
def _raise_valueerror(self):
|
||||
return int('fishcakes')
|
||||
|
||||
def test_run_in_thread_with_threads(self):
|
||||
tp = utils.ThreadPool(1)
|
||||
|
||||
my_id = self._thread_id()
|
||||
other_id = tp.run_in_thread(self._thread_id)
|
||||
self.assertNotEquals(my_id, other_id)
|
||||
|
||||
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
|
||||
self.assertEquals(result, {'args': (1, 2),
|
||||
'kwargs': {'bert': 'ernie'}})
|
||||
|
||||
caught = False
|
||||
try:
|
||||
tp.run_in_thread(self._raise_valueerror)
|
||||
except ValueError:
|
||||
caught = True
|
||||
self.assertTrue(caught)
|
||||
|
||||
def test_force_run_in_thread_with_threads(self):
|
||||
# with nthreads > 0, force_run_in_thread looks just like run_in_thread
|
||||
tp = utils.ThreadPool(1)
|
||||
|
||||
my_id = self._thread_id()
|
||||
other_id = tp.force_run_in_thread(self._thread_id)
|
||||
self.assertNotEquals(my_id, other_id)
|
||||
|
||||
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
|
||||
self.assertEquals(result, {'args': (1, 2),
|
||||
'kwargs': {'bert': 'ernie'}})
|
||||
|
||||
caught = False
|
||||
try:
|
||||
tp.force_run_in_thread(self._raise_valueerror)
|
||||
except ValueError:
|
||||
caught = True
|
||||
self.assertTrue(caught)
|
||||
|
||||
def test_run_in_thread_without_threads(self):
|
||||
# with zero threads, run_in_thread doesn't actually do so
|
||||
tp = utils.ThreadPool(0)
|
||||
|
||||
my_id = self._thread_id()
|
||||
other_id = tp.run_in_thread(self._thread_id)
|
||||
self.assertEquals(my_id, other_id)
|
||||
|
||||
result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie')
|
||||
self.assertEquals(result, {'args': (1, 2),
|
||||
'kwargs': {'bert': 'ernie'}})
|
||||
|
||||
caught = False
|
||||
try:
|
||||
tp.run_in_thread(self._raise_valueerror)
|
||||
except ValueError:
|
||||
caught = True
|
||||
self.assertTrue(caught)
|
||||
|
||||
def test_force_run_in_thread_without_threads(self):
|
||||
# with zero threads, force_run_in_thread uses eventlet.tpool
|
||||
tp = utils.ThreadPool(0)
|
||||
|
||||
my_id = self._thread_id()
|
||||
other_id = tp.force_run_in_thread(self._thread_id)
|
||||
self.assertNotEquals(my_id, other_id)
|
||||
|
||||
result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie')
|
||||
self.assertEquals(result, {'args': (1, 2),
|
||||
'kwargs': {'bert': 'ernie'}})
|
||||
caught = False
|
||||
try:
|
||||
tp.force_run_in_thread(self._raise_valueerror)
|
||||
except ValueError:
|
||||
caught = True
|
||||
self.assertTrue(caught)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -49,13 +49,15 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
def fake_exe(*args, **kwargs):
|
||||
pass
|
||||
self._real_tpool_execute = tpool.execute
|
||||
def fake_exe(meth, *args, **kwargs):
|
||||
return meth(*args, **kwargs)
|
||||
tpool.execute = fake_exe
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.object_server.ObjectController """
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
tpool.execute = self._real_tpool_execute
|
||||
|
||||
def _create_test_file(self, data, keep_data_fp=True):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
|
Loading…
x
Reference in New Issue
Block a user