diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 678f2cfae1..4f6e133f13 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -127,6 +127,13 @@ use = egg:swift#object # an abort to occur. # replication_failure_threshold = 100 # replication_failure_ratio = 1.0 +# +# Use splice() for zero-copy object GETs. This requires Linux kernel +# version 3.0 or greater. If you set "splice = yes" but the kernel +# does not support it, error messages will appear in the object server +# logs at startup, but your object servers should continue to function. +# +# splice = no [filter:healthcheck] use = egg:swift#healthcheck diff --git a/swift/common/utils.py b/swift/common/utils.py index e37dc34c61..6681cf7120 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -84,6 +84,11 @@ SysLogHandler.priority_map['NOTICE'] = 'notice' # These are lazily pulled from libc elsewhere _sys_fallocate = None _posix_fadvise = None +_libc_socket = None +_libc_bind = None +_libc_accept = None +_libc_splice = None +_libc_tee = None # If set to non-zero, fallocate routines will fail based on free space # available being at or below this amount, in bytes. @@ -97,6 +102,13 @@ HASH_PATH_PREFIX = '' SWIFT_CONF_FILE = '/etc/swift/swift.conf' +# These constants are Linux-specific, and Python doesn't seem to know +# about them. We ask anyway just in case that ever gets fixed. +# +# The values were copied from the Linux 3.0 kernel headers. +AF_ALG = getattr(socket, 'AF_ALG', 38) +F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) + class InvalidHashPathConfigError(ValueError): @@ -292,16 +304,22 @@ def validate_configuration(): sys.exit("Error: %s" % e) -def load_libc_function(func_name, log_error=True): +def load_libc_function(func_name, log_error=True, + fail_if_missing=False): """ Attempt to find the function in libc, otherwise return a no-op func. :param func_name: name of the function to pull from libc. + :param log_error: log an error when a function can't be found + :param fail_if_missing: raise an exception when a function can't be found. + Default behavior is to return a no-op function. """ try: libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) return getattr(libc, func_name) except AttributeError: + if fail_if_missing: + raise if log_error: logging.warn(_("Unable to locate %s in libc. Leaving as a " "no-op."), func_name) @@ -3112,3 +3130,146 @@ def parse_content_disposition(header): attrs = attrs[len(m.group(0)):] attributes[m.group(1)] = m.group(2).strip('"') return header, attributes + + +class sockaddr_alg(ctypes.Structure): + _fields_ = [("salg_family", ctypes.c_ushort), + ("salg_type", ctypes.c_ubyte * 14), + ("salg_feat", ctypes.c_uint), + ("salg_mask", ctypes.c_uint), + ("salg_name", ctypes.c_ubyte * 64)] + + +_bound_md5_sockfd = None + + +def get_md5_socket(): + """ + Get an MD5 socket file descriptor. One can MD5 data with it by writing it + to the socket with os.write, then os.read the 16 bytes of the checksum out + later. + + NOTE: It is the caller's responsibility to ensure that os.close() is + called on the returned file descriptor. This is a bare file descriptor, + not a Python object. It doesn't close itself. + """ + + # Linux's AF_ALG sockets work like this: + # + # First, initialize a socket with socket() and bind(). This tells the + # socket what algorithm to use, as well as setting up any necessary bits + # like crypto keys. Of course, MD5 doesn't need any keys, so it's just the + # algorithm name. + # + # Second, to hash some data, get a second socket by calling accept() on + # the first socket. Write data to the socket, then when finished, read the + # checksum from the socket and close it. This lets you checksum multiple + # things without repeating all the setup code each time. + # + # Since we only need to bind() one socket, we do that here and save it for + # future re-use. That way, we only use one file descriptor to get an MD5 + # socket instead of two, and we also get to save some syscalls. + + global _bound_md5_sockfd + global _libc_socket + global _libc_bind + global _libc_accept + + if _libc_accept is None: + _libc_accept = load_libc_function('accept', fail_if_missing=True) + if _libc_socket is None: + _libc_socket = load_libc_function('socket', fail_if_missing=True) + if _libc_bind is None: + _libc_bind = load_libc_function('bind', fail_if_missing=True) + + # Do this at first call rather than at import time so that we don't use a + # file descriptor on systems that aren't using any MD5 sockets. + if _bound_md5_sockfd is None: + sockaddr_setup = sockaddr_alg( + AF_ALG, + (ord('h'), ord('a'), ord('s'), ord('h'), 0), + 0, 0, + (ord('m'), ord('d'), ord('5'), 0)) + hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG), + ctypes.c_int(socket.SOCK_SEQPACKET), + ctypes.c_int(0)) + if hash_sockfd < 0: + raise IOError(ctypes.get_errno(), + "Failed to initialize MD5 socket") + + bind_result = _libc_bind(ctypes.c_int(hash_sockfd), + ctypes.pointer(sockaddr_setup), + ctypes.c_int(ctypes.sizeof(sockaddr_alg))) + if bind_result < 0: + os.close(hash_sockfd) + raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket") + + _bound_md5_sockfd = hash_sockfd + + md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0) + if md5_sockfd < 0: + raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") + + return md5_sockfd + + +# Flags for splice() and tee() +SPLICE_F_MOVE = 1 +SPLICE_F_NONBLOCK = 2 +SPLICE_F_MORE = 4 +SPLICE_F_GIFT = 8 + + +def splice(fd_in, off_in, fd_out, off_out, length, flags): + """ + Calls splice - a Linux-specific syscall for zero-copy data movement. + + On success, returns the number of bytes moved. + + On failure where errno is EWOULDBLOCK, returns None. + + On all other failures, raises IOError. + """ + global _libc_splice + if _libc_splice is None: + _libc_splice = load_libc_function('splice', fail_if_missing=True) + + ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in), + ctypes.c_int(fd_out), ctypes.c_long(off_out), + ctypes.c_int(length), ctypes.c_int(flags)) + if ret < 0: + err = ctypes.get_errno() + if err == errno.EWOULDBLOCK: + return None + else: + raise IOError(err, "splice() failed: %s" % os.strerror(err)) + return ret + + +def tee(fd_in, fd_out, length, flags): + """ + Calls tee - a Linux-specific syscall to let pipes share data. + + On success, returns the number of bytes "copied". + + On failure, raises IOError. + """ + global _libc_tee + if _libc_tee is None: + _libc_tee = load_libc_function('tee', fail_if_missing=True) + + ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out), + ctypes.c_int(length), ctypes.c_int(flags)) + if ret < 0: + err = ctypes.get_errno() + raise IOError(err, "tee() failed: %s" % os.strerror(err)) + return ret + + +def system_has_splice(): + global _libc_splice + try: + _libc_splice = load_libc_function('splice', fail_if_missing=True) + return True + except AttributeError: + return False diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 896225a978..62b38f8a22 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -32,6 +32,7 @@ are also not considered part of the backend API. import cPickle as pickle import errno +import fcntl import os import time import uuid @@ -46,6 +47,7 @@ from collections import defaultdict from xattr import getxattr, setxattr from eventlet import Timeout +from eventlet.hubs import trampoline from swift import gettext_ as _ from swift.common.constraints import check_mount @@ -53,7 +55,9 @@ from swift.common.request_helpers import is_sys_meta from swift.common.utils import mkdirs, Timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ - config_true_value, listdir, split_path, ismount, remove_file + config_true_value, listdir, split_path, ismount, remove_file, \ + get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \ + F_SETPIPE_SZ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ @@ -62,10 +66,12 @@ from swift.common.swob import multi_range_iterator from swift.common.storage_policy import get_policy_string, POLICIES from functools import partial + PICKLE_PROTOCOL = 2 ONE_WEEK = 604800 HASH_FILE = 'hashes.pkl' METADATA_KEY = 'user.swift.metadata' +DROP_CACHE_WINDOW = 1024 * 1024 # These are system-set metadata keys that cannot be changed with a POST. # They should be lowercase. DATAFILE_SYSTEM_META = set('content-length content-type deleted etag'.split()) @@ -75,6 +81,7 @@ TMP_BASE = 'tmp' get_data_dir = partial(get_policy_string, DATADIR_BASE) get_async_dir = partial(get_policy_string, ASYNCDIR_BASE) get_tmp_dir = partial(get_policy_string, TMP_BASE) +MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e' def read_metadata(fd): @@ -498,6 +505,37 @@ class DiskFileManager(object): self.threadpools = defaultdict( lambda: ThreadPool(nthreads=threads_per_disk)) + self.use_splice = False + self.pipe_size = None + + splice_available = system_has_splice() + + conf_wants_splice = config_true_value(conf.get('splice', 'no')) + # If the operator wants zero-copy with splice() but we don't have the + # requisite kernel support, complain so they can go fix it. + if conf_wants_splice and not splice_available: + self.logger.warn( + "Use of splice() requested (config says \"splice = %s\"), " + "but the system does not support it. " + "splice() will not be used." % conf.get('splice')) + elif conf_wants_splice and splice_available: + try: + sockfd = get_md5_socket() + os.close(sockfd) + except IOError as err: + # AF_ALG socket support was introduced in kernel 2.6.38; on + # systems with older kernels (or custom-built kernels lacking + # AF_ALG support), we can't use zero-copy. + if err.errno != errno.EAFNOSUPPORT: + raise + self.logger.warn("MD5 sockets not supported. " + "splice() will not be used.") + else: + self.use_splice = True + with open('/proc/sys/fs/pipe-max-size') as f: + max_pipe_size = int(f.read()) + self.pipe_size = min(max_pipe_size, self.disk_chunk_size) + def construct_dev_path(self, device): """ Construct the path to a device without checking if it is mounted. @@ -564,7 +602,9 @@ class DiskFileManager(object): raise DiskFileDeviceUnavailable() return DiskFile(self, dev_path, self.threadpools[device], partition, account, container, obj, - policy_idx=policy_idx, **kwargs) + policy_idx=policy_idx, + use_splice=self.use_splice, pipe_size=self.pipe_size, + **kwargs) def object_audit_location_generator(self, device_dirs=None): return object_audit_location_generator(self.devices, self.mount_check, @@ -830,11 +870,13 @@ class DiskFileReader(object): :param device_path: on-disk device path, used when quarantining an obj :param logger: logger caller wants this object to use :param quarantine_hook: 1-arg callable called w/reason when quarantined + :param use_splice: if true, use zero-copy splice() to send data + :param pipe_size: size of pipe buffer used in zero-copy operations :param keep_cache: should resulting reads be kept in the buffer cache """ def __init__(self, fp, data_file, obj_size, etag, threadpool, disk_chunk_size, keep_cache_size, device_path, logger, - quarantine_hook, keep_cache=False): + quarantine_hook, use_splice, pipe_size, keep_cache=False): # Parameter tracking self._fp = fp self._data_file = data_file @@ -845,6 +887,8 @@ class DiskFileReader(object): self._device_path = device_path self._logger = logger self._quarantine_hook = quarantine_hook + self._use_splice = use_splice + self._pipe_size = pipe_size if keep_cache: # Caller suggests we keep this in cache, only do it if the # object's size is less than the maximum. @@ -857,6 +901,7 @@ class DiskFileReader(object): self._bytes_read = 0 self._started_at_0 = False self._read_to_eof = False + self._md5_of_sent_bytes = None self._suppress_file_closing = False self._quarantined_dir = None @@ -877,7 +922,7 @@ class DiskFileReader(object): if self._iter_etag: self._iter_etag.update(chunk) self._bytes_read += len(chunk) - if self._bytes_read - dropped_cache > (1024 * 1024): + if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW: self._drop_cache(self._fp.fileno(), dropped_cache, self._bytes_read - dropped_cache) dropped_cache = self._bytes_read @@ -891,6 +936,109 @@ class DiskFileReader(object): if not self._suppress_file_closing: self.close() + def can_zero_copy_send(self): + return self._use_splice + + def zero_copy_send(self, wsockfd): + """ + Does some magic with splice() and tee() to move stuff from disk to + network without ever touching userspace. + + :param wsockfd: file descriptor (integer) of the socket out which to + send data + """ + # Note: if we ever add support for zero-copy ranged GET responses, + # we'll have to make this conditional. + self._started_at_0 = True + + rfd = self._fp.fileno() + client_rpipe, client_wpipe = os.pipe() + hash_rpipe, hash_wpipe = os.pipe() + md5_sockfd = get_md5_socket() + + # The actual amount allocated to the pipe may be rounded up to the + # nearest multiple of the page size. If we have the memory allocated, + # we may as well use it. + # + # Note: this will raise IOError on failure, so we don't bother + # checking the return value. + pipe_size = fcntl.fcntl(client_rpipe, F_SETPIPE_SZ, self._pipe_size) + fcntl.fcntl(hash_rpipe, F_SETPIPE_SZ, pipe_size) + + dropped_cache = 0 + self._bytes_read = 0 + try: + while True: + # Read data from disk to pipe + bytes_in_pipe = self._threadpool.run_in_thread( + splice, rfd, 0, client_wpipe, 0, pipe_size, 0) + if bytes_in_pipe == 0: + self._read_to_eof = True + self._drop_cache(rfd, dropped_cache, + self._bytes_read - dropped_cache) + break + self._bytes_read += bytes_in_pipe + + # "Copy" data from pipe A to pipe B (really just some pointer + # manipulation in the kernel, not actual copying). + bytes_copied = tee(client_rpipe, hash_wpipe, bytes_in_pipe, 0) + if bytes_copied != bytes_in_pipe: + # We teed data between two pipes of equal size, and the + # destination pipe was empty. If, somehow, the destination + # pipe was full before all the data was teed, we should + # fail here. If we don't raise an exception, then we will + # have the incorrect MD5 hash once the object has been + # sent out, causing a false-positive quarantine. + raise Exception("tee() failed: tried to move %d bytes, " + "but only moved %d" % + (bytes_in_pipe, bytes_copied)) + # Take the data and feed it into an in-kernel MD5 socket. The + # MD5 socket hashes data that is written to it. Reading from + # it yields the MD5 checksum of the written data. + # + # Note that we don't have to worry about splice() returning + # None here (which happens on EWOULDBLOCK); we're splicing + # $bytes_in_pipe bytes from a pipe with exactly that many + # bytes in it, so read won't block, and we're splicing it into + # an MD5 socket, which synchronously hashes any data sent to + # it, so writing won't block either. + hashed = splice(hash_rpipe, 0, md5_sockfd, 0, + bytes_in_pipe, SPLICE_F_MORE) + if hashed != bytes_in_pipe: + raise Exception("md5 socket didn't take all the data? " + "(tried to write %d, but wrote %d)" % + (bytes_in_pipe, hashed)) + + while bytes_in_pipe > 0: + sent = splice(client_rpipe, 0, wsockfd, 0, + bytes_in_pipe, 0) + if sent is None: # would have blocked + trampoline(wsockfd, write=True) + else: + bytes_in_pipe -= sent + + if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW: + self._drop_cache(rfd, dropped_cache, + self._bytes_read - dropped_cache) + dropped_cache = self._bytes_read + finally: + # Linux MD5 sockets return '00000000000000000000000000000000' for + # the checksum if you didn't write any bytes to them, instead of + # returning the correct value. + if self._bytes_read > 0: + bin_checksum = os.read(md5_sockfd, 16) + hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum) + else: + hex_checksum = MD5_OF_EMPTY_STRING + self._md5_of_sent_bytes = hex_checksum + + os.close(client_rpipe) + os.close(client_wpipe) + os.close(hash_rpipe) + os.close(hash_wpipe) + os.close(md5_sockfd) + self.close() + def app_iter_range(self, start, stop): """Returns an iterator over the data file for range (start, stop)""" if start or start == 0: @@ -942,15 +1090,18 @@ class DiskFileReader(object): def _handle_close_quarantine(self): """Check if file needs to be quarantined""" + if self._iter_etag and not self._md5_of_sent_bytes: + self._md5_of_sent_bytes = self._iter_etag.hexdigest() + if self._bytes_read != self._obj_size: self._quarantine( "Bytes read: %s, does not match metadata: %s" % ( self._bytes_read, self._obj_size)) - elif self._iter_etag and \ - self._etag != self._iter_etag.hexdigest(): + elif self._md5_of_sent_bytes and \ + self._etag != self._md5_of_sent_bytes: self._quarantine( "ETag %s and file's md5 %s do not match" % ( - self._etag, self._iter_etag.hexdigest())) + self._etag, self._md5_of_sent_bytes)) def close(self): """ @@ -998,17 +1149,21 @@ class DiskFile(object): :param obj: object name for the object :param _datadir: override the full datadir otherwise constructed here :param policy_idx: used to get the data dir when constructing it here + :param use_splice: if true, use zero-copy splice() to send data + :param pipe_size: size of pipe buffer used in zero-copy operations """ def __init__(self, mgr, device_path, threadpool, partition, account=None, container=None, obj=None, _datadir=None, - policy_idx=0): + policy_idx=0, use_splice=False, pipe_size=None): self._mgr = mgr self._device_path = device_path self._threadpool = threadpool or ThreadPool(nthreads=0) self._logger = mgr.logger self._disk_chunk_size = mgr.disk_chunk_size self._bytes_per_sync = mgr.bytes_per_sync + self._use_splice = use_splice + self._pipe_size = pipe_size if account and container and obj: self._name = '/' + '/'.join((account, container, obj)) self._account = account @@ -1377,7 +1532,8 @@ class DiskFile(object): self._fp, self._data_file, int(self._metadata['Content-Length']), self._metadata['ETag'], self._threadpool, self._disk_chunk_size, self._mgr.keep_cache_size, self._device_path, self._logger, - quarantine_hook=_quarantine_hook, keep_cache=keep_cache) + use_splice=self._use_splice, quarantine_hook=_quarantine_hook, + pipe_size=self._pipe_size, keep_cache=keep_cache) # At this point the reader object is now responsible for closing # the file pointer. self._fp = None diff --git a/swift/obj/server.py b/swift/obj/server.py index 0fa1d7622e..fcac395c02 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -25,7 +25,7 @@ import math from swift import gettext_ as _ from hashlib import md5 -from eventlet import sleep, Timeout +from eventlet import sleep, wsgi, Timeout from swift.common.utils import public, get_logger, \ config_true_value, timing_stats, replication, \ @@ -50,6 +50,19 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager +class EventletPlungerString(str): + """ + Eventlet won't send headers until it's accumulated at least + eventlet.wsgi.MINIMUM_CHUNK_SIZE bytes or the app iter is exhausted. If we + want to send the response body behind Eventlet's back, perhaps with some + zero-copy wizardry, then we have to unclog the plumbing in eventlet.wsgi + to force the headers out, so we use an EventletPlungerString to empty out + all of Eventlet's buffers. + """ + def __len__(self): + return wsgi.MINIMUM_CHUNK_SIZE + 1 + + class ObjectController(object): """Implements the WSGI application for the Swift Object Server.""" @@ -710,7 +723,57 @@ class ObjectController(object): slow = self.slow - trans_time if slow > 0: sleep(slow) - return res(env, start_response) + + # To be able to zero-copy send the object, we need a few things. + # First, we have to be responding successfully to a GET, or else we're + # not sending the object. Second, we have to be able to extract the + # socket file descriptor from the WSGI input object. Third, the + # diskfile has to support zero-copy send. + # + # There's a good chance that this could work for 206 responses too, + # but the common case is sending the whole object, so we'll start + # there. + if req.method == 'GET' and res.status_int == 200 and \ + isinstance(env['wsgi.input'], wsgi.Input): + app_iter = getattr(res, 'app_iter', None) + checker = getattr(app_iter, 'can_zero_copy_send', None) + if checker and checker(): + # For any kind of zero-copy thing like sendfile or splice, we + # need the file descriptor. Eventlet doesn't provide a clean + # way of getting that, so we resort to this. + wsock = env['wsgi.input'].get_socket() + wsockfd = wsock.fileno() + + # Don't call zero_copy_send() until after we force the HTTP + # headers out of Eventlet and into the socket. + def zero_copy_iter(): + # If possible, set TCP_CORK so that headers don't + # immediately go on the wire, but instead, wait for some + # response body to make the TCP frames as large as + # possible (and hence as few packets as possible). + # + # On non-Linux systems, we might consider TCP_NODELAY, but + # since the only known zero-copy-capable diskfile uses + # Linux-specific syscalls, we'll defer that work until + # someone needs it. + if hasattr(socket, 'TCP_CORK'): + wsock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_CORK, 1) + yield EventletPlungerString() + try: + app_iter.zero_copy_send(wsockfd) + except Exception: + self.logger.exception("zero_copy_send() blew up") + raise + yield '' + + # Get headers ready to go out + res(env, start_response) + return zero_copy_iter() + else: + return res(env, start_response) + else: + return res(env, start_response) def global_conf_callback(preloaded_app_conf, global_conf): diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 505dfb0cb9..b869c9a1e3 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -376,7 +376,8 @@ class FakeLogger(logging.Logger): def _clear(self): self.log_dict = defaultdict(list) - self.lines_dict = defaultdict(list) + self.lines_dict = {'critical': [], 'error': [], 'info': [], + 'warning': [], 'debug': []} def _store_in(store_name): def stub_fn(self, *args, **kwargs): @@ -390,8 +391,17 @@ class FakeLogger(logging.Logger): return stub_fn def get_lines_for_level(self, level): + if level not in self.lines_dict: + raise KeyError( + "Invalid log level '%s'; valid levels are %s" % + (level, + ', '.join("'%s'" % lvl for lvl in sorted(self.lines_dict)))) return self.lines_dict[level] + def all_log_lines(self): + return dict((level, msgs) for level, msgs in self.lines_dict.items() + if len(msgs) > 0) + error = _store_and_log_in('error', logging.ERROR) info = _store_and_log_in('info', logging.INFO) warning = _store_and_log_in('warning', logging.WARNING) diff --git a/test/unit/common/test_container_sync_realms.py b/test/unit/common/test_container_sync_realms.py index cc300e780d..1ce8d489b8 100644 --- a/test/unit/common/test_container_sync_realms.py +++ b/test/unit/common/test_container_sync_realms.py @@ -28,7 +28,7 @@ class TestUtils(unittest.TestCase): logger = FakeLogger() csr = ContainerSyncRealms(unique, logger) self.assertEqual( - logger.lines_dict, + logger.all_log_lines(), {'debug': [ "Could not load '%s': [Errno 2] No such file or directory: " "'%s'" % (unique, unique)]}) @@ -45,7 +45,7 @@ class TestUtils(unittest.TestCase): csr = ContainerSyncRealms(fpath, logger) try: self.assertEqual( - logger.lines_dict, + logger.all_log_lines(), {'error': [ "Could not load '%s': [Errno 13] Permission denied: " "'%s'" % (fpath, fpath)]}) @@ -61,7 +61,7 @@ class TestUtils(unittest.TestCase): logger = FakeLogger() fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) - self.assertEqual(logger.lines_dict, {}) + self.assertEqual(logger.all_log_lines(), {}) self.assertEqual(csr.mtime_check_interval, 300) self.assertEqual(csr.realms(), []) @@ -73,7 +73,7 @@ class TestUtils(unittest.TestCase): fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) self.assertEqual( - logger.lines_dict, + logger.all_log_lines(), {'error': [ "Could not load '%s': File contains no section headers.\n" "file: %s, line: 1\n" @@ -92,7 +92,7 @@ cluster_dfw1 = http://dfw1.host/v1/ logger = FakeLogger() fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) - self.assertEqual(logger.lines_dict, {}) + self.assertEqual(logger.all_log_lines(), {}) self.assertEqual(csr.mtime_check_interval, 300) self.assertEqual(csr.realms(), ['US']) self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b') @@ -120,7 +120,7 @@ cluster_lon3 = http://lon3.host/v1/ logger = FakeLogger() fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) - self.assertEqual(logger.lines_dict, {}) + self.assertEqual(logger.all_log_lines(), {}) self.assertEqual(csr.mtime_check_interval, 60) self.assertEqual(sorted(csr.realms()), ['UK', 'US']) self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b') @@ -144,7 +144,7 @@ cluster_lon3 = http://lon3.host/v1/ logger = FakeLogger() fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) - self.assertEqual(logger.lines_dict, {}) + self.assertEqual(logger.all_log_lines(), {}) self.assertEqual(csr.mtime_check_interval, 300) self.assertEqual(csr.realms(), ['US']) self.assertEqual(csr.key('US'), None) @@ -163,7 +163,7 @@ mtime_check_interval = invalid fpath = os.path.join(tempdir, fname) csr = ContainerSyncRealms(fpath, logger) self.assertEqual( - logger.lines_dict, + logger.all_log_lines(), {'error': [ "Error in '%s' with mtime_check_interval: invalid literal " "for int() with base 10: 'invalid'" % fpath]}) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 6f91477d4f..685bcfe4d6 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1503,6 +1503,9 @@ class TestUtils(unittest.TestCase): utils.load_libc_function('printf'))) self.assert_(callable( utils.load_libc_function('some_not_real_function'))) + self.assertRaises(AttributeError, + utils.load_libc_function, 'some_not_real_function', + fail_if_missing=True) def test_readconf(self): conf = '''[section1] diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index b62dbf85cc..281dbe61da 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -36,6 +36,7 @@ from eventlet import tpool from test.unit import (FakeLogger, mock as unit_mock, temptree, patch_policies, debug_logger) +from nose import SkipTest from swift.obj import diskfile from swift.common import utils from swift.common.utils import hash_path, mkdirs, Timestamp @@ -951,6 +952,18 @@ class TestDiskFileManager(unittest.TestCase): lock_exc = err self.assertTrue(lock_exc is None) + def test_missing_splice_warning(self): + logger = FakeLogger() + with mock.patch('swift.obj.diskfile.system_has_splice', + lambda: False): + self.conf['splice'] = 'yes' + mgr = diskfile.DiskFileManager(self.conf, logger) + + warnings = logger.get_lines_for_level('warning') + self.assertTrue(len(warnings) > 0) + self.assertTrue('splice()' in warnings[-1]) + self.assertFalse(mgr.use_splice) + @patch_policies class TestDiskFile(unittest.TestCase): @@ -2183,6 +2196,50 @@ class TestDiskFile(unittest.TestCase): self.assertEquals(len(dl), 2) self.assertTrue(exp_name in set(dl)) + def _system_can_zero_copy(self): + if not utils.system_has_splice(): + return False + + try: + utils.get_md5_socket() + except IOError: + return False + + return True + + def test_zero_copy_cache_dropping(self): + if not self._system_can_zero_copy(): + raise SkipTest("zero-copy support is missing") + + self.conf['splice'] = 'on' + self.conf['keep_cache_size'] = 16384 + self.conf['disk_chunk_size'] = 4096 + self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger()) + + df = self._get_open_disk_file(fsize=16385) + reader = df.reader() + self.assertTrue(reader.can_zero_copy_send()) + with mock.patch("swift.obj.diskfile.drop_buffer_cache") as dbc: + with mock.patch("swift.obj.diskfile.DROP_CACHE_WINDOW", 4095): + with open('/dev/null', 'w') as devnull: + reader.zero_copy_send(devnull.fileno()) + self.assertEqual(len(dbc.mock_calls), 5) + + def test_zero_copy_turns_off_when_md5_sockets_not_supported(self): + if not self._system_can_zero_copy(): + raise SkipTest("zero-copy support is missing") + + self.conf['splice'] = 'on' + with mock.patch('swift.obj.diskfile.get_md5_socket') as mock_md5sock: + mock_md5sock.side_effect = IOError( + errno.EAFNOSUPPORT, "MD5 socket busted") + df = self._get_open_disk_file(fsize=128) + reader = df.reader() + self.assertFalse(reader.can_zero_copy_send()) + + log_lines = self.df_mgr.logger.get_lines_for_level('warning') + self.assert_('MD5 sockets' in log_lines[-1]) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index e2137484ef..85886d363c 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -33,6 +33,7 @@ import itertools import tempfile from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool +from eventlet.green import httplib from nose import SkipTest @@ -4373,5 +4374,136 @@ class TestObjectServer(unittest.TestCase): resp.close() +class TestZeroCopy(unittest.TestCase): + """Test the object server's zero-copy functionality""" + + def _system_can_zero_copy(self): + if not utils.system_has_splice(): + return False + + try: + utils.get_md5_socket() + except IOError: + return False + + return True + + def setUp(self): + if not self._system_can_zero_copy(): + raise SkipTest("zero-copy support is missing") + + self.testdir = mkdtemp(suffix="obj_server_zero_copy") + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + + conf = {'devices': self.testdir, + 'mount_check': 'false', + 'splice': 'yes', + 'disk_chunk_size': '4096'} + self.object_controller = object_server.ObjectController( + conf, logger=debug_logger()) + self.df_mgr = diskfile.DiskFileManager( + conf, self.object_controller.logger) + + listener = listen(('localhost', 0)) + port = listener.getsockname()[1] + self.wsgi_greenlet = spawn( + wsgi.server, listener, self.object_controller, NullLogger()) + + self.http_conn = httplib.HTTPConnection('localhost', port) + self.http_conn.connect() + + def tearDown(self): + """Tear down for testing swift.object.server.ObjectController""" + self.wsgi_greenlet.kill() + rmtree(self.testdir) + + def test_GET(self): + url_path = '/sda1/2100/a/c/o' + + self.http_conn.request('PUT', url_path, 'obj contents', + {'X-Timestamp': '127082564.24709'}) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 201) + response.read() + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 200) + contents = response.read() + self.assertEqual(contents, 'obj contents') + + def test_GET_big(self): + # Test with a large-ish object to make sure we handle full socket + # buffers correctly. + obj_contents = 'A' * 4 * 1024 * 1024 # 4 MiB + url_path = '/sda1/2100/a/c/o' + + self.http_conn.request('PUT', url_path, obj_contents, + {'X-Timestamp': '1402600322.52126'}) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 201) + response.read() + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 200) + contents = response.read() + self.assertEqual(contents, obj_contents) + + def test_quarantine(self): + obj_hash = hash_path('a', 'c', 'o') + url_path = '/sda1/2100/a/c/o' + ts = '1402601849.47475' + + self.http_conn.request('PUT', url_path, 'obj contents', + {'X-Timestamp': ts}) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 201) + response.read() + + # go goof up the file on disk + fname = os.path.join(self.testdir, 'sda1', 'objects', '2100', + obj_hash[-3:], obj_hash, ts + '.data') + + with open(fname, 'rb+') as fh: + fh.write('XYZ') + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 200) + contents = response.read() + self.assertEqual(contents, 'XYZ contents') + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + # it was quarantined by the previous request + self.assertEqual(response.status, 404) + response.read() + + def test_quarantine_on_well_formed_zero_byte_file(self): + # Make sure we work around an oddity in Linux's hash sockets + url_path = '/sda1/2100/a/c/o' + ts = '1402700497.71333' + + self.http_conn.request( + 'PUT', url_path, '', + {'X-Timestamp': ts, 'Content-Length': '0'}) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 201) + response.read() + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 200) + contents = response.read() + self.assertEqual(contents, '') + + self.http_conn.request('GET', url_path) + response = self.http_conn.getresponse() + self.assertEqual(response.status, 200) # still there + contents = response.read() + self.assertEqual(contents, '') + + if __name__ == '__main__': unittest.main()