Zero-copy object-server GET responses with splice()
This commit lets the object server use splice() and tee() to move data from disk to the network without ever copying it into user space. Requires Linux. Sorry, FreeBSD folks. You still have the old mechanism, as does anyone who doesn't want to use splice. This requires a relatively recent kernel (2.6.38+) to work, which includes the two most recent Ubuntu LTS releases (Precise and Trusty) as well as RHEL 7. However, it excludes Lucid and RHEL 6. On those systems, setting "splice = on" will result in warnings in the logs but no actual use of splice. Note that this only applies to GET responses without Range headers. It can easily be extended to single-range GET requests, but this commit leaves that for future work. Same goes for PUT requests, or at least non-chunked ones. On some real hardware I had laying around (not a VM), this produced a 37% reduction in CPU usage for GETs made directly to the object server. Measurements were done by looking at /proc/<pid>/stat, specifically the utime and stime fields (user and kernel CPU jiffies, respectively). Note: There is a Python module called "splicetee" available on PyPi, but it's licensed under the GPL, so it cannot easily be added to OpenStack's requirements. That's why this patch uses ctypes instead. Also fixed a long-standing annoyance in FakeLogger: >>> fake_logger.warn('stuff') >>> fake_logger.get_lines_for_level('warn') [] >>> This, of course, is because the correct log level is 'warning'. Now you get a KeyError if you call get_lines_for_level with a bogus log level. Change-Id: Ic6d6b833a5b04ca2019be94b1b90d941929d21c8
This commit is contained in:
parent
a81b2d2c74
commit
7d0e5ebe69
@ -127,6 +127,13 @@ use = egg:swift#object
|
||||
# an abort to occur.
|
||||
# replication_failure_threshold = 100
|
||||
# replication_failure_ratio = 1.0
|
||||
#
|
||||
# Use splice() for zero-copy object GETs. This requires Linux kernel
|
||||
# version 3.0 or greater. If you set "splice = yes" but the kernel
|
||||
# does not support it, error messages will appear in the object server
|
||||
# logs at startup, but your object servers should continue to function.
|
||||
#
|
||||
# splice = no
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -84,6 +84,11 @@ SysLogHandler.priority_map['NOTICE'] = 'notice'
|
||||
# These are lazily pulled from libc elsewhere
|
||||
_sys_fallocate = None
|
||||
_posix_fadvise = None
|
||||
_libc_socket = None
|
||||
_libc_bind = None
|
||||
_libc_accept = None
|
||||
_libc_splice = None
|
||||
_libc_tee = None
|
||||
|
||||
# If set to non-zero, fallocate routines will fail based on free space
|
||||
# available being at or below this amount, in bytes.
|
||||
@ -97,6 +102,13 @@ HASH_PATH_PREFIX = ''
|
||||
|
||||
SWIFT_CONF_FILE = '/etc/swift/swift.conf'
|
||||
|
||||
# These constants are Linux-specific, and Python doesn't seem to know
|
||||
# about them. We ask anyway just in case that ever gets fixed.
|
||||
#
|
||||
# The values were copied from the Linux 3.0 kernel headers.
|
||||
AF_ALG = getattr(socket, 'AF_ALG', 38)
|
||||
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)
|
||||
|
||||
|
||||
class InvalidHashPathConfigError(ValueError):
|
||||
|
||||
@ -292,16 +304,22 @@ def validate_configuration():
|
||||
sys.exit("Error: %s" % e)
|
||||
|
||||
|
||||
def load_libc_function(func_name, log_error=True):
|
||||
def load_libc_function(func_name, log_error=True,
|
||||
fail_if_missing=False):
|
||||
"""
|
||||
Attempt to find the function in libc, otherwise return a no-op func.
|
||||
|
||||
:param func_name: name of the function to pull from libc.
|
||||
:param log_error: log an error when a function can't be found
|
||||
:param fail_if_missing: raise an exception when a function can't be found.
|
||||
Default behavior is to return a no-op function.
|
||||
"""
|
||||
try:
|
||||
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
|
||||
return getattr(libc, func_name)
|
||||
except AttributeError:
|
||||
if fail_if_missing:
|
||||
raise
|
||||
if log_error:
|
||||
logging.warn(_("Unable to locate %s in libc. Leaving as a "
|
||||
"no-op."), func_name)
|
||||
@ -3112,3 +3130,146 @@ def parse_content_disposition(header):
|
||||
attrs = attrs[len(m.group(0)):]
|
||||
attributes[m.group(1)] = m.group(2).strip('"')
|
||||
return header, attributes
|
||||
|
||||
|
||||
class sockaddr_alg(ctypes.Structure):
|
||||
_fields_ = [("salg_family", ctypes.c_ushort),
|
||||
("salg_type", ctypes.c_ubyte * 14),
|
||||
("salg_feat", ctypes.c_uint),
|
||||
("salg_mask", ctypes.c_uint),
|
||||
("salg_name", ctypes.c_ubyte * 64)]
|
||||
|
||||
|
||||
_bound_md5_sockfd = None
|
||||
|
||||
|
||||
def get_md5_socket():
|
||||
"""
|
||||
Get an MD5 socket file descriptor. One can MD5 data with it by writing it
|
||||
to the socket with os.write, then os.read the 16 bytes of the checksum out
|
||||
later.
|
||||
|
||||
NOTE: It is the caller's responsibility to ensure that os.close() is
|
||||
called on the returned file descriptor. This is a bare file descriptor,
|
||||
not a Python object. It doesn't close itself.
|
||||
"""
|
||||
|
||||
# Linux's AF_ALG sockets work like this:
|
||||
#
|
||||
# First, initialize a socket with socket() and bind(). This tells the
|
||||
# socket what algorithm to use, as well as setting up any necessary bits
|
||||
# like crypto keys. Of course, MD5 doesn't need any keys, so it's just the
|
||||
# algorithm name.
|
||||
#
|
||||
# Second, to hash some data, get a second socket by calling accept() on
|
||||
# the first socket. Write data to the socket, then when finished, read the
|
||||
# checksum from the socket and close it. This lets you checksum multiple
|
||||
# things without repeating all the setup code each time.
|
||||
#
|
||||
# Since we only need to bind() one socket, we do that here and save it for
|
||||
# future re-use. That way, we only use one file descriptor to get an MD5
|
||||
# socket instead of two, and we also get to save some syscalls.
|
||||
|
||||
global _bound_md5_sockfd
|
||||
global _libc_socket
|
||||
global _libc_bind
|
||||
global _libc_accept
|
||||
|
||||
if _libc_accept is None:
|
||||
_libc_accept = load_libc_function('accept', fail_if_missing=True)
|
||||
if _libc_socket is None:
|
||||
_libc_socket = load_libc_function('socket', fail_if_missing=True)
|
||||
if _libc_bind is None:
|
||||
_libc_bind = load_libc_function('bind', fail_if_missing=True)
|
||||
|
||||
# Do this at first call rather than at import time so that we don't use a
|
||||
# file descriptor on systems that aren't using any MD5 sockets.
|
||||
if _bound_md5_sockfd is None:
|
||||
sockaddr_setup = sockaddr_alg(
|
||||
AF_ALG,
|
||||
(ord('h'), ord('a'), ord('s'), ord('h'), 0),
|
||||
0, 0,
|
||||
(ord('m'), ord('d'), ord('5'), 0))
|
||||
hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG),
|
||||
ctypes.c_int(socket.SOCK_SEQPACKET),
|
||||
ctypes.c_int(0))
|
||||
if hash_sockfd < 0:
|
||||
raise IOError(ctypes.get_errno(),
|
||||
"Failed to initialize MD5 socket")
|
||||
|
||||
bind_result = _libc_bind(ctypes.c_int(hash_sockfd),
|
||||
ctypes.pointer(sockaddr_setup),
|
||||
ctypes.c_int(ctypes.sizeof(sockaddr_alg)))
|
||||
if bind_result < 0:
|
||||
os.close(hash_sockfd)
|
||||
raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket")
|
||||
|
||||
_bound_md5_sockfd = hash_sockfd
|
||||
|
||||
md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0)
|
||||
if md5_sockfd < 0:
|
||||
raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket")
|
||||
|
||||
return md5_sockfd
|
||||
|
||||
|
||||
# Flags for splice() and tee()
|
||||
SPLICE_F_MOVE = 1
|
||||
SPLICE_F_NONBLOCK = 2
|
||||
SPLICE_F_MORE = 4
|
||||
SPLICE_F_GIFT = 8
|
||||
|
||||
|
||||
def splice(fd_in, off_in, fd_out, off_out, length, flags):
|
||||
"""
|
||||
Calls splice - a Linux-specific syscall for zero-copy data movement.
|
||||
|
||||
On success, returns the number of bytes moved.
|
||||
|
||||
On failure where errno is EWOULDBLOCK, returns None.
|
||||
|
||||
On all other failures, raises IOError.
|
||||
"""
|
||||
global _libc_splice
|
||||
if _libc_splice is None:
|
||||
_libc_splice = load_libc_function('splice', fail_if_missing=True)
|
||||
|
||||
ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in),
|
||||
ctypes.c_int(fd_out), ctypes.c_long(off_out),
|
||||
ctypes.c_int(length), ctypes.c_int(flags))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
if err == errno.EWOULDBLOCK:
|
||||
return None
|
||||
else:
|
||||
raise IOError(err, "splice() failed: %s" % os.strerror(err))
|
||||
return ret
|
||||
|
||||
|
||||
def tee(fd_in, fd_out, length, flags):
|
||||
"""
|
||||
Calls tee - a Linux-specific syscall to let pipes share data.
|
||||
|
||||
On success, returns the number of bytes "copied".
|
||||
|
||||
On failure, raises IOError.
|
||||
"""
|
||||
global _libc_tee
|
||||
if _libc_tee is None:
|
||||
_libc_tee = load_libc_function('tee', fail_if_missing=True)
|
||||
|
||||
ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out),
|
||||
ctypes.c_int(length), ctypes.c_int(flags))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise IOError(err, "tee() failed: %s" % os.strerror(err))
|
||||
return ret
|
||||
|
||||
|
||||
def system_has_splice():
|
||||
global _libc_splice
|
||||
try:
|
||||
_libc_splice = load_libc_function('splice', fail_if_missing=True)
|
||||
return True
|
||||
except AttributeError:
|
||||
return False
|
||||
|
@ -32,6 +32,7 @@ are also not considered part of the backend API.
|
||||
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
@ -46,6 +47,7 @@ from collections import defaultdict
|
||||
|
||||
from xattr import getxattr, setxattr
|
||||
from eventlet import Timeout
|
||||
from eventlet.hubs import trampoline
|
||||
|
||||
from swift import gettext_ as _
|
||||
from swift.common.constraints import check_mount
|
||||
@ -53,7 +55,9 @@ from swift.common.request_helpers import is_sys_meta
|
||||
from swift.common.utils import mkdirs, Timestamp, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, \
|
||||
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
|
||||
config_true_value, listdir, split_path, ismount, remove_file
|
||||
config_true_value, listdir, split_path, ismount, remove_file, \
|
||||
get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \
|
||||
F_SETPIPE_SZ
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
|
||||
@ -62,10 +66,12 @@ from swift.common.swob import multi_range_iterator
|
||||
from swift.common.storage_policy import get_policy_string, POLICIES
|
||||
from functools import partial
|
||||
|
||||
|
||||
PICKLE_PROTOCOL = 2
|
||||
ONE_WEEK = 604800
|
||||
HASH_FILE = 'hashes.pkl'
|
||||
METADATA_KEY = 'user.swift.metadata'
|
||||
DROP_CACHE_WINDOW = 1024 * 1024
|
||||
# These are system-set metadata keys that cannot be changed with a POST.
|
||||
# They should be lowercase.
|
||||
DATAFILE_SYSTEM_META = set('content-length content-type deleted etag'.split())
|
||||
@ -75,6 +81,7 @@ TMP_BASE = 'tmp'
|
||||
get_data_dir = partial(get_policy_string, DATADIR_BASE)
|
||||
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
|
||||
get_tmp_dir = partial(get_policy_string, TMP_BASE)
|
||||
MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
|
||||
|
||||
def read_metadata(fd):
|
||||
@ -498,6 +505,37 @@ class DiskFileManager(object):
|
||||
self.threadpools = defaultdict(
|
||||
lambda: ThreadPool(nthreads=threads_per_disk))
|
||||
|
||||
self.use_splice = False
|
||||
self.pipe_size = None
|
||||
|
||||
splice_available = system_has_splice()
|
||||
|
||||
conf_wants_splice = config_true_value(conf.get('splice', 'no'))
|
||||
# If the operator wants zero-copy with splice() but we don't have the
|
||||
# requisite kernel support, complain so they can go fix it.
|
||||
if conf_wants_splice and not splice_available:
|
||||
self.logger.warn(
|
||||
"Use of splice() requested (config says \"splice = %s\"), "
|
||||
"but the system does not support it. "
|
||||
"splice() will not be used." % conf.get('splice'))
|
||||
elif conf_wants_splice and splice_available:
|
||||
try:
|
||||
sockfd = get_md5_socket()
|
||||
os.close(sockfd)
|
||||
except IOError as err:
|
||||
# AF_ALG socket support was introduced in kernel 2.6.38; on
|
||||
# systems with older kernels (or custom-built kernels lacking
|
||||
# AF_ALG support), we can't use zero-copy.
|
||||
if err.errno != errno.EAFNOSUPPORT:
|
||||
raise
|
||||
self.logger.warn("MD5 sockets not supported. "
|
||||
"splice() will not be used.")
|
||||
else:
|
||||
self.use_splice = True
|
||||
with open('/proc/sys/fs/pipe-max-size') as f:
|
||||
max_pipe_size = int(f.read())
|
||||
self.pipe_size = min(max_pipe_size, self.disk_chunk_size)
|
||||
|
||||
def construct_dev_path(self, device):
|
||||
"""
|
||||
Construct the path to a device without checking if it is mounted.
|
||||
@ -564,7 +602,9 @@ class DiskFileManager(object):
|
||||
raise DiskFileDeviceUnavailable()
|
||||
return DiskFile(self, dev_path, self.threadpools[device],
|
||||
partition, account, container, obj,
|
||||
policy_idx=policy_idx, **kwargs)
|
||||
policy_idx=policy_idx,
|
||||
use_splice=self.use_splice, pipe_size=self.pipe_size,
|
||||
**kwargs)
|
||||
|
||||
def object_audit_location_generator(self, device_dirs=None):
|
||||
return object_audit_location_generator(self.devices, self.mount_check,
|
||||
@ -830,11 +870,13 @@ class DiskFileReader(object):
|
||||
:param device_path: on-disk device path, used when quarantining an obj
|
||||
:param logger: logger caller wants this object to use
|
||||
:param quarantine_hook: 1-arg callable called w/reason when quarantined
|
||||
:param use_splice: if true, use zero-copy splice() to send data
|
||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag, threadpool,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, keep_cache=False):
|
||||
quarantine_hook, use_splice, pipe_size, keep_cache=False):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@ -845,6 +887,8 @@ class DiskFileReader(object):
|
||||
self._device_path = device_path
|
||||
self._logger = logger
|
||||
self._quarantine_hook = quarantine_hook
|
||||
self._use_splice = use_splice
|
||||
self._pipe_size = pipe_size
|
||||
if keep_cache:
|
||||
# Caller suggests we keep this in cache, only do it if the
|
||||
# object's size is less than the maximum.
|
||||
@ -857,6 +901,7 @@ class DiskFileReader(object):
|
||||
self._bytes_read = 0
|
||||
self._started_at_0 = False
|
||||
self._read_to_eof = False
|
||||
self._md5_of_sent_bytes = None
|
||||
self._suppress_file_closing = False
|
||||
self._quarantined_dir = None
|
||||
|
||||
@ -877,7 +922,7 @@ class DiskFileReader(object):
|
||||
if self._iter_etag:
|
||||
self._iter_etag.update(chunk)
|
||||
self._bytes_read += len(chunk)
|
||||
if self._bytes_read - dropped_cache > (1024 * 1024):
|
||||
if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
|
||||
self._drop_cache(self._fp.fileno(), dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
dropped_cache = self._bytes_read
|
||||
@ -891,6 +936,109 @@ class DiskFileReader(object):
|
||||
if not self._suppress_file_closing:
|
||||
self.close()
|
||||
|
||||
def can_zero_copy_send(self):
|
||||
return self._use_splice
|
||||
|
||||
def zero_copy_send(self, wsockfd):
|
||||
"""
|
||||
Does some magic with splice() and tee() to move stuff from disk to
|
||||
network without ever touching userspace.
|
||||
|
||||
:param wsockfd: file descriptor (integer) of the socket out which to
|
||||
send data
|
||||
"""
|
||||
# Note: if we ever add support for zero-copy ranged GET responses,
|
||||
# we'll have to make this conditional.
|
||||
self._started_at_0 = True
|
||||
|
||||
rfd = self._fp.fileno()
|
||||
client_rpipe, client_wpipe = os.pipe()
|
||||
hash_rpipe, hash_wpipe = os.pipe()
|
||||
md5_sockfd = get_md5_socket()
|
||||
|
||||
# The actual amount allocated to the pipe may be rounded up to the
|
||||
# nearest multiple of the page size. If we have the memory allocated,
|
||||
# we may as well use it.
|
||||
#
|
||||
# Note: this will raise IOError on failure, so we don't bother
|
||||
# checking the return value.
|
||||
pipe_size = fcntl.fcntl(client_rpipe, F_SETPIPE_SZ, self._pipe_size)
|
||||
fcntl.fcntl(hash_rpipe, F_SETPIPE_SZ, pipe_size)
|
||||
|
||||
dropped_cache = 0
|
||||
self._bytes_read = 0
|
||||
try:
|
||||
while True:
|
||||
# Read data from disk to pipe
|
||||
bytes_in_pipe = self._threadpool.run_in_thread(
|
||||
splice, rfd, 0, client_wpipe, 0, pipe_size, 0)
|
||||
if bytes_in_pipe == 0:
|
||||
self._read_to_eof = True
|
||||
self._drop_cache(rfd, dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
break
|
||||
self._bytes_read += bytes_in_pipe
|
||||
|
||||
# "Copy" data from pipe A to pipe B (really just some pointer
|
||||
# manipulation in the kernel, not actual copying).
|
||||
bytes_copied = tee(client_rpipe, hash_wpipe, bytes_in_pipe, 0)
|
||||
if bytes_copied != bytes_in_pipe:
|
||||
# We teed data between two pipes of equal size, and the
|
||||
# destination pipe was empty. If, somehow, the destination
|
||||
# pipe was full before all the data was teed, we should
|
||||
# fail here. If we don't raise an exception, then we will
|
||||
# have the incorrect MD5 hash once the object has been
|
||||
# sent out, causing a false-positive quarantine.
|
||||
raise Exception("tee() failed: tried to move %d bytes, "
|
||||
"but only moved %d" %
|
||||
(bytes_in_pipe, bytes_copied))
|
||||
# Take the data and feed it into an in-kernel MD5 socket. The
|
||||
# MD5 socket hashes data that is written to it. Reading from
|
||||
# it yields the MD5 checksum of the written data.
|
||||
#
|
||||
# Note that we don't have to worry about splice() returning
|
||||
# None here (which happens on EWOULDBLOCK); we're splicing
|
||||
# $bytes_in_pipe bytes from a pipe with exactly that many
|
||||
# bytes in it, so read won't block, and we're splicing it into
|
||||
# an MD5 socket, which synchronously hashes any data sent to
|
||||
# it, so writing won't block either.
|
||||
hashed = splice(hash_rpipe, 0, md5_sockfd, 0,
|
||||
bytes_in_pipe, SPLICE_F_MORE)
|
||||
if hashed != bytes_in_pipe:
|
||||
raise Exception("md5 socket didn't take all the data? "
|
||||
"(tried to write %d, but wrote %d)" %
|
||||
(bytes_in_pipe, hashed))
|
||||
|
||||
while bytes_in_pipe > 0:
|
||||
sent = splice(client_rpipe, 0, wsockfd, 0,
|
||||
bytes_in_pipe, 0)
|
||||
if sent is None: # would have blocked
|
||||
trampoline(wsockfd, write=True)
|
||||
else:
|
||||
bytes_in_pipe -= sent
|
||||
|
||||
if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
|
||||
self._drop_cache(rfd, dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
dropped_cache = self._bytes_read
|
||||
finally:
|
||||
# Linux MD5 sockets return '00000000000000000000000000000000' for
|
||||
# the checksum if you didn't write any bytes to them, instead of
|
||||
# returning the correct value.
|
||||
if self._bytes_read > 0:
|
||||
bin_checksum = os.read(md5_sockfd, 16)
|
||||
hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum)
|
||||
else:
|
||||
hex_checksum = MD5_OF_EMPTY_STRING
|
||||
self._md5_of_sent_bytes = hex_checksum
|
||||
|
||||
os.close(client_rpipe)
|
||||
os.close(client_wpipe)
|
||||
os.close(hash_rpipe)
|
||||
os.close(hash_wpipe)
|
||||
os.close(md5_sockfd)
|
||||
self.close()
|
||||
|
||||
def app_iter_range(self, start, stop):
|
||||
"""Returns an iterator over the data file for range (start, stop)"""
|
||||
if start or start == 0:
|
||||
@ -942,15 +1090,18 @@ class DiskFileReader(object):
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
if self._iter_etag and not self._md5_of_sent_bytes:
|
||||
self._md5_of_sent_bytes = self._iter_etag.hexdigest()
|
||||
|
||||
if self._bytes_read != self._obj_size:
|
||||
self._quarantine(
|
||||
"Bytes read: %s, does not match metadata: %s" % (
|
||||
self._bytes_read, self._obj_size))
|
||||
elif self._iter_etag and \
|
||||
self._etag != self._iter_etag.hexdigest():
|
||||
elif self._md5_of_sent_bytes and \
|
||||
self._etag != self._md5_of_sent_bytes:
|
||||
self._quarantine(
|
||||
"ETag %s and file's md5 %s do not match" % (
|
||||
self._etag, self._iter_etag.hexdigest()))
|
||||
self._etag, self._md5_of_sent_bytes))
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
@ -998,17 +1149,21 @@ class DiskFile(object):
|
||||
:param obj: object name for the object
|
||||
:param _datadir: override the full datadir otherwise constructed here
|
||||
:param policy_idx: used to get the data dir when constructing it here
|
||||
:param use_splice: if true, use zero-copy splice() to send data
|
||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||
"""
|
||||
|
||||
def __init__(self, mgr, device_path, threadpool, partition,
|
||||
account=None, container=None, obj=None, _datadir=None,
|
||||
policy_idx=0):
|
||||
policy_idx=0, use_splice=False, pipe_size=None):
|
||||
self._mgr = mgr
|
||||
self._device_path = device_path
|
||||
self._threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
self._logger = mgr.logger
|
||||
self._disk_chunk_size = mgr.disk_chunk_size
|
||||
self._bytes_per_sync = mgr.bytes_per_sync
|
||||
self._use_splice = use_splice
|
||||
self._pipe_size = pipe_size
|
||||
if account and container and obj:
|
||||
self._name = '/' + '/'.join((account, container, obj))
|
||||
self._account = account
|
||||
@ -1377,7 +1532,8 @@ class DiskFile(object):
|
||||
self._fp, self._data_file, int(self._metadata['Content-Length']),
|
||||
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
|
||||
self._mgr.keep_cache_size, self._device_path, self._logger,
|
||||
quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
|
||||
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
|
||||
pipe_size=self._pipe_size, keep_cache=keep_cache)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
|
@ -25,7 +25,7 @@ import math
|
||||
from swift import gettext_ as _
|
||||
from hashlib import md5
|
||||
|
||||
from eventlet import sleep, Timeout
|
||||
from eventlet import sleep, wsgi, Timeout
|
||||
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
@ -50,6 +50,19 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
||||
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
|
||||
|
||||
|
||||
class EventletPlungerString(str):
|
||||
"""
|
||||
Eventlet won't send headers until it's accumulated at least
|
||||
eventlet.wsgi.MINIMUM_CHUNK_SIZE bytes or the app iter is exhausted. If we
|
||||
want to send the response body behind Eventlet's back, perhaps with some
|
||||
zero-copy wizardry, then we have to unclog the plumbing in eventlet.wsgi
|
||||
to force the headers out, so we use an EventletPlungerString to empty out
|
||||
all of Eventlet's buffers.
|
||||
"""
|
||||
def __len__(self):
|
||||
return wsgi.MINIMUM_CHUNK_SIZE + 1
|
||||
|
||||
|
||||
class ObjectController(object):
|
||||
"""Implements the WSGI application for the Swift Object Server."""
|
||||
|
||||
@ -710,7 +723,57 @@ class ObjectController(object):
|
||||
slow = self.slow - trans_time
|
||||
if slow > 0:
|
||||
sleep(slow)
|
||||
return res(env, start_response)
|
||||
|
||||
# To be able to zero-copy send the object, we need a few things.
|
||||
# First, we have to be responding successfully to a GET, or else we're
|
||||
# not sending the object. Second, we have to be able to extract the
|
||||
# socket file descriptor from the WSGI input object. Third, the
|
||||
# diskfile has to support zero-copy send.
|
||||
#
|
||||
# There's a good chance that this could work for 206 responses too,
|
||||
# but the common case is sending the whole object, so we'll start
|
||||
# there.
|
||||
if req.method == 'GET' and res.status_int == 200 and \
|
||||
isinstance(env['wsgi.input'], wsgi.Input):
|
||||
app_iter = getattr(res, 'app_iter', None)
|
||||
checker = getattr(app_iter, 'can_zero_copy_send', None)
|
||||
if checker and checker():
|
||||
# For any kind of zero-copy thing like sendfile or splice, we
|
||||
# need the file descriptor. Eventlet doesn't provide a clean
|
||||
# way of getting that, so we resort to this.
|
||||
wsock = env['wsgi.input'].get_socket()
|
||||
wsockfd = wsock.fileno()
|
||||
|
||||
# Don't call zero_copy_send() until after we force the HTTP
|
||||
# headers out of Eventlet and into the socket.
|
||||
def zero_copy_iter():
|
||||
# If possible, set TCP_CORK so that headers don't
|
||||
# immediately go on the wire, but instead, wait for some
|
||||
# response body to make the TCP frames as large as
|
||||
# possible (and hence as few packets as possible).
|
||||
#
|
||||
# On non-Linux systems, we might consider TCP_NODELAY, but
|
||||
# since the only known zero-copy-capable diskfile uses
|
||||
# Linux-specific syscalls, we'll defer that work until
|
||||
# someone needs it.
|
||||
if hasattr(socket, 'TCP_CORK'):
|
||||
wsock.setsockopt(socket.IPPROTO_TCP,
|
||||
socket.TCP_CORK, 1)
|
||||
yield EventletPlungerString()
|
||||
try:
|
||||
app_iter.zero_copy_send(wsockfd)
|
||||
except Exception:
|
||||
self.logger.exception("zero_copy_send() blew up")
|
||||
raise
|
||||
yield ''
|
||||
|
||||
# Get headers ready to go out
|
||||
res(env, start_response)
|
||||
return zero_copy_iter()
|
||||
else:
|
||||
return res(env, start_response)
|
||||
else:
|
||||
return res(env, start_response)
|
||||
|
||||
|
||||
def global_conf_callback(preloaded_app_conf, global_conf):
|
||||
|
@ -376,7 +376,8 @@ class FakeLogger(logging.Logger):
|
||||
|
||||
def _clear(self):
|
||||
self.log_dict = defaultdict(list)
|
||||
self.lines_dict = defaultdict(list)
|
||||
self.lines_dict = {'critical': [], 'error': [], 'info': [],
|
||||
'warning': [], 'debug': []}
|
||||
|
||||
def _store_in(store_name):
|
||||
def stub_fn(self, *args, **kwargs):
|
||||
@ -390,8 +391,17 @@ class FakeLogger(logging.Logger):
|
||||
return stub_fn
|
||||
|
||||
def get_lines_for_level(self, level):
|
||||
if level not in self.lines_dict:
|
||||
raise KeyError(
|
||||
"Invalid log level '%s'; valid levels are %s" %
|
||||
(level,
|
||||
', '.join("'%s'" % lvl for lvl in sorted(self.lines_dict))))
|
||||
return self.lines_dict[level]
|
||||
|
||||
def all_log_lines(self):
|
||||
return dict((level, msgs) for level, msgs in self.lines_dict.items()
|
||||
if len(msgs) > 0)
|
||||
|
||||
error = _store_and_log_in('error', logging.ERROR)
|
||||
info = _store_and_log_in('info', logging.INFO)
|
||||
warning = _store_and_log_in('warning', logging.WARNING)
|
||||
|
@ -28,7 +28,7 @@ class TestUtils(unittest.TestCase):
|
||||
logger = FakeLogger()
|
||||
csr = ContainerSyncRealms(unique, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'debug': [
|
||||
"Could not load '%s': [Errno 2] No such file or directory: "
|
||||
"'%s'" % (unique, unique)]})
|
||||
@ -45,7 +45,7 @@ class TestUtils(unittest.TestCase):
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
try:
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Could not load '%s': [Errno 13] Permission denied: "
|
||||
"'%s'" % (fpath, fpath)]})
|
||||
@ -61,7 +61,7 @@ class TestUtils(unittest.TestCase):
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), [])
|
||||
|
||||
@ -73,7 +73,7 @@ class TestUtils(unittest.TestCase):
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Could not load '%s': File contains no section headers.\n"
|
||||
"file: %s, line: 1\n"
|
||||
@ -92,7 +92,7 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), ['US'])
|
||||
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
|
||||
@ -120,7 +120,7 @@ cluster_lon3 = http://lon3.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 60)
|
||||
self.assertEqual(sorted(csr.realms()), ['UK', 'US'])
|
||||
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
|
||||
@ -144,7 +144,7 @@ cluster_lon3 = http://lon3.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), ['US'])
|
||||
self.assertEqual(csr.key('US'), None)
|
||||
@ -163,7 +163,7 @@ mtime_check_interval = invalid
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Error in '%s' with mtime_check_interval: invalid literal "
|
||||
"for int() with base 10: 'invalid'" % fpath]})
|
||||
|
@ -1503,6 +1503,9 @@ class TestUtils(unittest.TestCase):
|
||||
utils.load_libc_function('printf')))
|
||||
self.assert_(callable(
|
||||
utils.load_libc_function('some_not_real_function')))
|
||||
self.assertRaises(AttributeError,
|
||||
utils.load_libc_function, 'some_not_real_function',
|
||||
fail_if_missing=True)
|
||||
|
||||
def test_readconf(self):
|
||||
conf = '''[section1]
|
||||
|
@ -36,6 +36,7 @@ from eventlet import tpool
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
patch_policies, debug_logger)
|
||||
|
||||
from nose import SkipTest
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, Timestamp
|
||||
@ -951,6 +952,18 @@ class TestDiskFileManager(unittest.TestCase):
|
||||
lock_exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
|
||||
def test_missing_splice_warning(self):
|
||||
logger = FakeLogger()
|
||||
with mock.patch('swift.obj.diskfile.system_has_splice',
|
||||
lambda: False):
|
||||
self.conf['splice'] = 'yes'
|
||||
mgr = diskfile.DiskFileManager(self.conf, logger)
|
||||
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
self.assertTrue(len(warnings) > 0)
|
||||
self.assertTrue('splice()' in warnings[-1])
|
||||
self.assertFalse(mgr.use_splice)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
@ -2183,6 +2196,50 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def _system_can_zero_copy(self):
|
||||
if not utils.system_has_splice():
|
||||
return False
|
||||
|
||||
try:
|
||||
utils.get_md5_socket()
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def test_zero_copy_cache_dropping(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.conf['splice'] = 'on'
|
||||
self.conf['keep_cache_size'] = 16384
|
||||
self.conf['disk_chunk_size'] = 4096
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
|
||||
df = self._get_open_disk_file(fsize=16385)
|
||||
reader = df.reader()
|
||||
self.assertTrue(reader.can_zero_copy_send())
|
||||
with mock.patch("swift.obj.diskfile.drop_buffer_cache") as dbc:
|
||||
with mock.patch("swift.obj.diskfile.DROP_CACHE_WINDOW", 4095):
|
||||
with open('/dev/null', 'w') as devnull:
|
||||
reader.zero_copy_send(devnull.fileno())
|
||||
self.assertEqual(len(dbc.mock_calls), 5)
|
||||
|
||||
def test_zero_copy_turns_off_when_md5_sockets_not_supported(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.conf['splice'] = 'on'
|
||||
with mock.patch('swift.obj.diskfile.get_md5_socket') as mock_md5sock:
|
||||
mock_md5sock.side_effect = IOError(
|
||||
errno.EAFNOSUPPORT, "MD5 socket busted")
|
||||
df = self._get_open_disk_file(fsize=128)
|
||||
reader = df.reader()
|
||||
self.assertFalse(reader.can_zero_copy_send())
|
||||
|
||||
log_lines = self.df_mgr.logger.get_lines_for_level('warning')
|
||||
self.assert_('MD5 sockets' in log_lines[-1])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -33,6 +33,7 @@ import itertools
|
||||
import tempfile
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
|
||||
from eventlet.green import httplib
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
@ -4373,5 +4374,136 @@ class TestObjectServer(unittest.TestCase):
|
||||
resp.close()
|
||||
|
||||
|
||||
class TestZeroCopy(unittest.TestCase):
|
||||
"""Test the object server's zero-copy functionality"""
|
||||
|
||||
def _system_can_zero_copy(self):
|
||||
if not utils.system_has_splice():
|
||||
return False
|
||||
|
||||
try:
|
||||
utils.get_md5_socket()
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def setUp(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.testdir = mkdtemp(suffix="obj_server_zero_copy")
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
conf = {'devices': self.testdir,
|
||||
'mount_check': 'false',
|
||||
'splice': 'yes',
|
||||
'disk_chunk_size': '4096'}
|
||||
self.object_controller = object_server.ObjectController(
|
||||
conf, logger=debug_logger())
|
||||
self.df_mgr = diskfile.DiskFileManager(
|
||||
conf, self.object_controller.logger)
|
||||
|
||||
listener = listen(('localhost', 0))
|
||||
port = listener.getsockname()[1]
|
||||
self.wsgi_greenlet = spawn(
|
||||
wsgi.server, listener, self.object_controller, NullLogger())
|
||||
|
||||
self.http_conn = httplib.HTTPConnection('localhost', port)
|
||||
self.http_conn.connect()
|
||||
|
||||
def tearDown(self):
|
||||
"""Tear down for testing swift.object.server.ObjectController"""
|
||||
self.wsgi_greenlet.kill()
|
||||
rmtree(self.testdir)
|
||||
|
||||
def test_GET(self):
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
|
||||
self.http_conn.request('PUT', url_path, 'obj contents',
|
||||
{'X-Timestamp': '127082564.24709'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, 'obj contents')
|
||||
|
||||
def test_GET_big(self):
|
||||
# Test with a large-ish object to make sure we handle full socket
|
||||
# buffers correctly.
|
||||
obj_contents = 'A' * 4 * 1024 * 1024 # 4 MiB
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
|
||||
self.http_conn.request('PUT', url_path, obj_contents,
|
||||
{'X-Timestamp': '1402600322.52126'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, obj_contents)
|
||||
|
||||
def test_quarantine(self):
|
||||
obj_hash = hash_path('a', 'c', 'o')
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
ts = '1402601849.47475'
|
||||
|
||||
self.http_conn.request('PUT', url_path, 'obj contents',
|
||||
{'X-Timestamp': ts})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
# go goof up the file on disk
|
||||
fname = os.path.join(self.testdir, 'sda1', 'objects', '2100',
|
||||
obj_hash[-3:], obj_hash, ts + '.data')
|
||||
|
||||
with open(fname, 'rb+') as fh:
|
||||
fh.write('XYZ')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, 'XYZ contents')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
# it was quarantined by the previous request
|
||||
self.assertEqual(response.status, 404)
|
||||
response.read()
|
||||
|
||||
def test_quarantine_on_well_formed_zero_byte_file(self):
|
||||
# Make sure we work around an oddity in Linux's hash sockets
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
ts = '1402700497.71333'
|
||||
|
||||
self.http_conn.request(
|
||||
'PUT', url_path, '',
|
||||
{'X-Timestamp': ts, 'Content-Length': '0'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, '')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200) # still there
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, '')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user