Merge "Zero-copy object-server GET responses with splice()"
This commit is contained in:
commit
0d502de2f5
@ -127,6 +127,13 @@ use = egg:swift#object
|
||||
# an abort to occur.
|
||||
# replication_failure_threshold = 100
|
||||
# replication_failure_ratio = 1.0
|
||||
#
|
||||
# Use splice() for zero-copy object GETs. This requires Linux kernel
|
||||
# version 3.0 or greater. If you set "splice = yes" but the kernel
|
||||
# does not support it, error messages will appear in the object server
|
||||
# logs at startup, but your object servers should continue to function.
|
||||
#
|
||||
# splice = no
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -84,6 +84,11 @@ SysLogHandler.priority_map['NOTICE'] = 'notice'
|
||||
# These are lazily pulled from libc elsewhere
|
||||
_sys_fallocate = None
|
||||
_posix_fadvise = None
|
||||
_libc_socket = None
|
||||
_libc_bind = None
|
||||
_libc_accept = None
|
||||
_libc_splice = None
|
||||
_libc_tee = None
|
||||
|
||||
# If set to non-zero, fallocate routines will fail based on free space
|
||||
# available being at or below this amount, in bytes.
|
||||
@ -97,6 +102,13 @@ HASH_PATH_PREFIX = ''
|
||||
|
||||
SWIFT_CONF_FILE = '/etc/swift/swift.conf'
|
||||
|
||||
# These constants are Linux-specific, and Python doesn't seem to know
|
||||
# about them. We ask anyway just in case that ever gets fixed.
|
||||
#
|
||||
# The values were copied from the Linux 3.0 kernel headers.
|
||||
AF_ALG = getattr(socket, 'AF_ALG', 38)
|
||||
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)
|
||||
|
||||
|
||||
class InvalidHashPathConfigError(ValueError):
|
||||
|
||||
@ -292,16 +304,22 @@ def validate_configuration():
|
||||
sys.exit("Error: %s" % e)
|
||||
|
||||
|
||||
def load_libc_function(func_name, log_error=True):
|
||||
def load_libc_function(func_name, log_error=True,
|
||||
fail_if_missing=False):
|
||||
"""
|
||||
Attempt to find the function in libc, otherwise return a no-op func.
|
||||
|
||||
:param func_name: name of the function to pull from libc.
|
||||
:param log_error: log an error when a function can't be found
|
||||
:param fail_if_missing: raise an exception when a function can't be found.
|
||||
Default behavior is to return a no-op function.
|
||||
"""
|
||||
try:
|
||||
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
|
||||
return getattr(libc, func_name)
|
||||
except AttributeError:
|
||||
if fail_if_missing:
|
||||
raise
|
||||
if log_error:
|
||||
logging.warn(_("Unable to locate %s in libc. Leaving as a "
|
||||
"no-op."), func_name)
|
||||
@ -3123,3 +3141,146 @@ def parse_content_disposition(header):
|
||||
attrs = attrs[len(m.group(0)):]
|
||||
attributes[m.group(1)] = m.group(2).strip('"')
|
||||
return header, attributes
|
||||
|
||||
|
||||
class sockaddr_alg(ctypes.Structure):
|
||||
_fields_ = [("salg_family", ctypes.c_ushort),
|
||||
("salg_type", ctypes.c_ubyte * 14),
|
||||
("salg_feat", ctypes.c_uint),
|
||||
("salg_mask", ctypes.c_uint),
|
||||
("salg_name", ctypes.c_ubyte * 64)]
|
||||
|
||||
|
||||
_bound_md5_sockfd = None
|
||||
|
||||
|
||||
def get_md5_socket():
|
||||
"""
|
||||
Get an MD5 socket file descriptor. One can MD5 data with it by writing it
|
||||
to the socket with os.write, then os.read the 16 bytes of the checksum out
|
||||
later.
|
||||
|
||||
NOTE: It is the caller's responsibility to ensure that os.close() is
|
||||
called on the returned file descriptor. This is a bare file descriptor,
|
||||
not a Python object. It doesn't close itself.
|
||||
"""
|
||||
|
||||
# Linux's AF_ALG sockets work like this:
|
||||
#
|
||||
# First, initialize a socket with socket() and bind(). This tells the
|
||||
# socket what algorithm to use, as well as setting up any necessary bits
|
||||
# like crypto keys. Of course, MD5 doesn't need any keys, so it's just the
|
||||
# algorithm name.
|
||||
#
|
||||
# Second, to hash some data, get a second socket by calling accept() on
|
||||
# the first socket. Write data to the socket, then when finished, read the
|
||||
# checksum from the socket and close it. This lets you checksum multiple
|
||||
# things without repeating all the setup code each time.
|
||||
#
|
||||
# Since we only need to bind() one socket, we do that here and save it for
|
||||
# future re-use. That way, we only use one file descriptor to get an MD5
|
||||
# socket instead of two, and we also get to save some syscalls.
|
||||
|
||||
global _bound_md5_sockfd
|
||||
global _libc_socket
|
||||
global _libc_bind
|
||||
global _libc_accept
|
||||
|
||||
if _libc_accept is None:
|
||||
_libc_accept = load_libc_function('accept', fail_if_missing=True)
|
||||
if _libc_socket is None:
|
||||
_libc_socket = load_libc_function('socket', fail_if_missing=True)
|
||||
if _libc_bind is None:
|
||||
_libc_bind = load_libc_function('bind', fail_if_missing=True)
|
||||
|
||||
# Do this at first call rather than at import time so that we don't use a
|
||||
# file descriptor on systems that aren't using any MD5 sockets.
|
||||
if _bound_md5_sockfd is None:
|
||||
sockaddr_setup = sockaddr_alg(
|
||||
AF_ALG,
|
||||
(ord('h'), ord('a'), ord('s'), ord('h'), 0),
|
||||
0, 0,
|
||||
(ord('m'), ord('d'), ord('5'), 0))
|
||||
hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG),
|
||||
ctypes.c_int(socket.SOCK_SEQPACKET),
|
||||
ctypes.c_int(0))
|
||||
if hash_sockfd < 0:
|
||||
raise IOError(ctypes.get_errno(),
|
||||
"Failed to initialize MD5 socket")
|
||||
|
||||
bind_result = _libc_bind(ctypes.c_int(hash_sockfd),
|
||||
ctypes.pointer(sockaddr_setup),
|
||||
ctypes.c_int(ctypes.sizeof(sockaddr_alg)))
|
||||
if bind_result < 0:
|
||||
os.close(hash_sockfd)
|
||||
raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket")
|
||||
|
||||
_bound_md5_sockfd = hash_sockfd
|
||||
|
||||
md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0)
|
||||
if md5_sockfd < 0:
|
||||
raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket")
|
||||
|
||||
return md5_sockfd
|
||||
|
||||
|
||||
# Flags for splice() and tee()
|
||||
SPLICE_F_MOVE = 1
|
||||
SPLICE_F_NONBLOCK = 2
|
||||
SPLICE_F_MORE = 4
|
||||
SPLICE_F_GIFT = 8
|
||||
|
||||
|
||||
def splice(fd_in, off_in, fd_out, off_out, length, flags):
|
||||
"""
|
||||
Calls splice - a Linux-specific syscall for zero-copy data movement.
|
||||
|
||||
On success, returns the number of bytes moved.
|
||||
|
||||
On failure where errno is EWOULDBLOCK, returns None.
|
||||
|
||||
On all other failures, raises IOError.
|
||||
"""
|
||||
global _libc_splice
|
||||
if _libc_splice is None:
|
||||
_libc_splice = load_libc_function('splice', fail_if_missing=True)
|
||||
|
||||
ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in),
|
||||
ctypes.c_int(fd_out), ctypes.c_long(off_out),
|
||||
ctypes.c_int(length), ctypes.c_int(flags))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
if err == errno.EWOULDBLOCK:
|
||||
return None
|
||||
else:
|
||||
raise IOError(err, "splice() failed: %s" % os.strerror(err))
|
||||
return ret
|
||||
|
||||
|
||||
def tee(fd_in, fd_out, length, flags):
|
||||
"""
|
||||
Calls tee - a Linux-specific syscall to let pipes share data.
|
||||
|
||||
On success, returns the number of bytes "copied".
|
||||
|
||||
On failure, raises IOError.
|
||||
"""
|
||||
global _libc_tee
|
||||
if _libc_tee is None:
|
||||
_libc_tee = load_libc_function('tee', fail_if_missing=True)
|
||||
|
||||
ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out),
|
||||
ctypes.c_int(length), ctypes.c_int(flags))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise IOError(err, "tee() failed: %s" % os.strerror(err))
|
||||
return ret
|
||||
|
||||
|
||||
def system_has_splice():
|
||||
global _libc_splice
|
||||
try:
|
||||
_libc_splice = load_libc_function('splice', fail_if_missing=True)
|
||||
return True
|
||||
except AttributeError:
|
||||
return False
|
||||
|
@ -32,6 +32,7 @@ are also not considered part of the backend API.
|
||||
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
@ -46,6 +47,7 @@ from collections import defaultdict
|
||||
|
||||
from xattr import getxattr, setxattr
|
||||
from eventlet import Timeout
|
||||
from eventlet.hubs import trampoline
|
||||
|
||||
from swift import gettext_ as _
|
||||
from swift.common.constraints import check_mount
|
||||
@ -53,7 +55,9 @@ from swift.common.request_helpers import is_sys_meta
|
||||
from swift.common.utils import mkdirs, Timestamp, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, \
|
||||
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
|
||||
config_true_value, listdir, split_path, ismount, remove_file
|
||||
config_true_value, listdir, split_path, ismount, remove_file, \
|
||||
get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \
|
||||
F_SETPIPE_SZ
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
|
||||
@ -62,10 +66,12 @@ from swift.common.swob import multi_range_iterator
|
||||
from swift.common.storage_policy import get_policy_string, POLICIES
|
||||
from functools import partial
|
||||
|
||||
|
||||
PICKLE_PROTOCOL = 2
|
||||
ONE_WEEK = 604800
|
||||
HASH_FILE = 'hashes.pkl'
|
||||
METADATA_KEY = 'user.swift.metadata'
|
||||
DROP_CACHE_WINDOW = 1024 * 1024
|
||||
# These are system-set metadata keys that cannot be changed with a POST.
|
||||
# They should be lowercase.
|
||||
DATAFILE_SYSTEM_META = set('content-length content-type deleted etag'.split())
|
||||
@ -75,6 +81,7 @@ TMP_BASE = 'tmp'
|
||||
get_data_dir = partial(get_policy_string, DATADIR_BASE)
|
||||
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
|
||||
get_tmp_dir = partial(get_policy_string, TMP_BASE)
|
||||
MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
|
||||
|
||||
def read_metadata(fd):
|
||||
@ -498,6 +505,37 @@ class DiskFileManager(object):
|
||||
self.threadpools = defaultdict(
|
||||
lambda: ThreadPool(nthreads=threads_per_disk))
|
||||
|
||||
self.use_splice = False
|
||||
self.pipe_size = None
|
||||
|
||||
splice_available = system_has_splice()
|
||||
|
||||
conf_wants_splice = config_true_value(conf.get('splice', 'no'))
|
||||
# If the operator wants zero-copy with splice() but we don't have the
|
||||
# requisite kernel support, complain so they can go fix it.
|
||||
if conf_wants_splice and not splice_available:
|
||||
self.logger.warn(
|
||||
"Use of splice() requested (config says \"splice = %s\"), "
|
||||
"but the system does not support it. "
|
||||
"splice() will not be used." % conf.get('splice'))
|
||||
elif conf_wants_splice and splice_available:
|
||||
try:
|
||||
sockfd = get_md5_socket()
|
||||
os.close(sockfd)
|
||||
except IOError as err:
|
||||
# AF_ALG socket support was introduced in kernel 2.6.38; on
|
||||
# systems with older kernels (or custom-built kernels lacking
|
||||
# AF_ALG support), we can't use zero-copy.
|
||||
if err.errno != errno.EAFNOSUPPORT:
|
||||
raise
|
||||
self.logger.warn("MD5 sockets not supported. "
|
||||
"splice() will not be used.")
|
||||
else:
|
||||
self.use_splice = True
|
||||
with open('/proc/sys/fs/pipe-max-size') as f:
|
||||
max_pipe_size = int(f.read())
|
||||
self.pipe_size = min(max_pipe_size, self.disk_chunk_size)
|
||||
|
||||
def construct_dev_path(self, device):
|
||||
"""
|
||||
Construct the path to a device without checking if it is mounted.
|
||||
@ -564,7 +602,9 @@ class DiskFileManager(object):
|
||||
raise DiskFileDeviceUnavailable()
|
||||
return DiskFile(self, dev_path, self.threadpools[device],
|
||||
partition, account, container, obj,
|
||||
policy_idx=policy_idx, **kwargs)
|
||||
policy_idx=policy_idx,
|
||||
use_splice=self.use_splice, pipe_size=self.pipe_size,
|
||||
**kwargs)
|
||||
|
||||
def object_audit_location_generator(self, device_dirs=None):
|
||||
return object_audit_location_generator(self.devices, self.mount_check,
|
||||
@ -830,11 +870,13 @@ class DiskFileReader(object):
|
||||
:param device_path: on-disk device path, used when quarantining an obj
|
||||
:param logger: logger caller wants this object to use
|
||||
:param quarantine_hook: 1-arg callable called w/reason when quarantined
|
||||
:param use_splice: if true, use zero-copy splice() to send data
|
||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag, threadpool,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
quarantine_hook, keep_cache=False):
|
||||
quarantine_hook, use_splice, pipe_size, keep_cache=False):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@ -845,6 +887,8 @@ class DiskFileReader(object):
|
||||
self._device_path = device_path
|
||||
self._logger = logger
|
||||
self._quarantine_hook = quarantine_hook
|
||||
self._use_splice = use_splice
|
||||
self._pipe_size = pipe_size
|
||||
if keep_cache:
|
||||
# Caller suggests we keep this in cache, only do it if the
|
||||
# object's size is less than the maximum.
|
||||
@ -857,6 +901,7 @@ class DiskFileReader(object):
|
||||
self._bytes_read = 0
|
||||
self._started_at_0 = False
|
||||
self._read_to_eof = False
|
||||
self._md5_of_sent_bytes = None
|
||||
self._suppress_file_closing = False
|
||||
self._quarantined_dir = None
|
||||
|
||||
@ -877,7 +922,7 @@ class DiskFileReader(object):
|
||||
if self._iter_etag:
|
||||
self._iter_etag.update(chunk)
|
||||
self._bytes_read += len(chunk)
|
||||
if self._bytes_read - dropped_cache > (1024 * 1024):
|
||||
if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
|
||||
self._drop_cache(self._fp.fileno(), dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
dropped_cache = self._bytes_read
|
||||
@ -891,6 +936,109 @@ class DiskFileReader(object):
|
||||
if not self._suppress_file_closing:
|
||||
self.close()
|
||||
|
||||
def can_zero_copy_send(self):
|
||||
return self._use_splice
|
||||
|
||||
def zero_copy_send(self, wsockfd):
|
||||
"""
|
||||
Does some magic with splice() and tee() to move stuff from disk to
|
||||
network without ever touching userspace.
|
||||
|
||||
:param wsockfd: file descriptor (integer) of the socket out which to
|
||||
send data
|
||||
"""
|
||||
# Note: if we ever add support for zero-copy ranged GET responses,
|
||||
# we'll have to make this conditional.
|
||||
self._started_at_0 = True
|
||||
|
||||
rfd = self._fp.fileno()
|
||||
client_rpipe, client_wpipe = os.pipe()
|
||||
hash_rpipe, hash_wpipe = os.pipe()
|
||||
md5_sockfd = get_md5_socket()
|
||||
|
||||
# The actual amount allocated to the pipe may be rounded up to the
|
||||
# nearest multiple of the page size. If we have the memory allocated,
|
||||
# we may as well use it.
|
||||
#
|
||||
# Note: this will raise IOError on failure, so we don't bother
|
||||
# checking the return value.
|
||||
pipe_size = fcntl.fcntl(client_rpipe, F_SETPIPE_SZ, self._pipe_size)
|
||||
fcntl.fcntl(hash_rpipe, F_SETPIPE_SZ, pipe_size)
|
||||
|
||||
dropped_cache = 0
|
||||
self._bytes_read = 0
|
||||
try:
|
||||
while True:
|
||||
# Read data from disk to pipe
|
||||
bytes_in_pipe = self._threadpool.run_in_thread(
|
||||
splice, rfd, 0, client_wpipe, 0, pipe_size, 0)
|
||||
if bytes_in_pipe == 0:
|
||||
self._read_to_eof = True
|
||||
self._drop_cache(rfd, dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
break
|
||||
self._bytes_read += bytes_in_pipe
|
||||
|
||||
# "Copy" data from pipe A to pipe B (really just some pointer
|
||||
# manipulation in the kernel, not actual copying).
|
||||
bytes_copied = tee(client_rpipe, hash_wpipe, bytes_in_pipe, 0)
|
||||
if bytes_copied != bytes_in_pipe:
|
||||
# We teed data between two pipes of equal size, and the
|
||||
# destination pipe was empty. If, somehow, the destination
|
||||
# pipe was full before all the data was teed, we should
|
||||
# fail here. If we don't raise an exception, then we will
|
||||
# have the incorrect MD5 hash once the object has been
|
||||
# sent out, causing a false-positive quarantine.
|
||||
raise Exception("tee() failed: tried to move %d bytes, "
|
||||
"but only moved %d" %
|
||||
(bytes_in_pipe, bytes_copied))
|
||||
# Take the data and feed it into an in-kernel MD5 socket. The
|
||||
# MD5 socket hashes data that is written to it. Reading from
|
||||
# it yields the MD5 checksum of the written data.
|
||||
#
|
||||
# Note that we don't have to worry about splice() returning
|
||||
# None here (which happens on EWOULDBLOCK); we're splicing
|
||||
# $bytes_in_pipe bytes from a pipe with exactly that many
|
||||
# bytes in it, so read won't block, and we're splicing it into
|
||||
# an MD5 socket, which synchronously hashes any data sent to
|
||||
# it, so writing won't block either.
|
||||
hashed = splice(hash_rpipe, 0, md5_sockfd, 0,
|
||||
bytes_in_pipe, SPLICE_F_MORE)
|
||||
if hashed != bytes_in_pipe:
|
||||
raise Exception("md5 socket didn't take all the data? "
|
||||
"(tried to write %d, but wrote %d)" %
|
||||
(bytes_in_pipe, hashed))
|
||||
|
||||
while bytes_in_pipe > 0:
|
||||
sent = splice(client_rpipe, 0, wsockfd, 0,
|
||||
bytes_in_pipe, 0)
|
||||
if sent is None: # would have blocked
|
||||
trampoline(wsockfd, write=True)
|
||||
else:
|
||||
bytes_in_pipe -= sent
|
||||
|
||||
if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
|
||||
self._drop_cache(rfd, dropped_cache,
|
||||
self._bytes_read - dropped_cache)
|
||||
dropped_cache = self._bytes_read
|
||||
finally:
|
||||
# Linux MD5 sockets return '00000000000000000000000000000000' for
|
||||
# the checksum if you didn't write any bytes to them, instead of
|
||||
# returning the correct value.
|
||||
if self._bytes_read > 0:
|
||||
bin_checksum = os.read(md5_sockfd, 16)
|
||||
hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum)
|
||||
else:
|
||||
hex_checksum = MD5_OF_EMPTY_STRING
|
||||
self._md5_of_sent_bytes = hex_checksum
|
||||
|
||||
os.close(client_rpipe)
|
||||
os.close(client_wpipe)
|
||||
os.close(hash_rpipe)
|
||||
os.close(hash_wpipe)
|
||||
os.close(md5_sockfd)
|
||||
self.close()
|
||||
|
||||
def app_iter_range(self, start, stop):
|
||||
"""Returns an iterator over the data file for range (start, stop)"""
|
||||
if start or start == 0:
|
||||
@ -942,15 +1090,18 @@ class DiskFileReader(object):
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
if self._iter_etag and not self._md5_of_sent_bytes:
|
||||
self._md5_of_sent_bytes = self._iter_etag.hexdigest()
|
||||
|
||||
if self._bytes_read != self._obj_size:
|
||||
self._quarantine(
|
||||
"Bytes read: %s, does not match metadata: %s" % (
|
||||
self._bytes_read, self._obj_size))
|
||||
elif self._iter_etag and \
|
||||
self._etag != self._iter_etag.hexdigest():
|
||||
elif self._md5_of_sent_bytes and \
|
||||
self._etag != self._md5_of_sent_bytes:
|
||||
self._quarantine(
|
||||
"ETag %s and file's md5 %s do not match" % (
|
||||
self._etag, self._iter_etag.hexdigest()))
|
||||
self._etag, self._md5_of_sent_bytes))
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
@ -998,17 +1149,21 @@ class DiskFile(object):
|
||||
:param obj: object name for the object
|
||||
:param _datadir: override the full datadir otherwise constructed here
|
||||
:param policy_idx: used to get the data dir when constructing it here
|
||||
:param use_splice: if true, use zero-copy splice() to send data
|
||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||
"""
|
||||
|
||||
def __init__(self, mgr, device_path, threadpool, partition,
|
||||
account=None, container=None, obj=None, _datadir=None,
|
||||
policy_idx=0):
|
||||
policy_idx=0, use_splice=False, pipe_size=None):
|
||||
self._mgr = mgr
|
||||
self._device_path = device_path
|
||||
self._threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
self._logger = mgr.logger
|
||||
self._disk_chunk_size = mgr.disk_chunk_size
|
||||
self._bytes_per_sync = mgr.bytes_per_sync
|
||||
self._use_splice = use_splice
|
||||
self._pipe_size = pipe_size
|
||||
if account and container and obj:
|
||||
self._name = '/' + '/'.join((account, container, obj))
|
||||
self._account = account
|
||||
@ -1377,7 +1532,8 @@ class DiskFile(object):
|
||||
self._fp, self._data_file, int(self._metadata['Content-Length']),
|
||||
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
|
||||
self._mgr.keep_cache_size, self._device_path, self._logger,
|
||||
quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
|
||||
use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
|
||||
pipe_size=self._pipe_size, keep_cache=keep_cache)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
self._fp = None
|
||||
|
@ -25,7 +25,7 @@ import math
|
||||
from swift import gettext_ as _
|
||||
from hashlib import md5
|
||||
|
||||
from eventlet import sleep, Timeout
|
||||
from eventlet import sleep, wsgi, Timeout
|
||||
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
@ -50,6 +50,19 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
||||
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
|
||||
|
||||
|
||||
class EventletPlungerString(str):
|
||||
"""
|
||||
Eventlet won't send headers until it's accumulated at least
|
||||
eventlet.wsgi.MINIMUM_CHUNK_SIZE bytes or the app iter is exhausted. If we
|
||||
want to send the response body behind Eventlet's back, perhaps with some
|
||||
zero-copy wizardry, then we have to unclog the plumbing in eventlet.wsgi
|
||||
to force the headers out, so we use an EventletPlungerString to empty out
|
||||
all of Eventlet's buffers.
|
||||
"""
|
||||
def __len__(self):
|
||||
return wsgi.MINIMUM_CHUNK_SIZE + 1
|
||||
|
||||
|
||||
class ObjectController(object):
|
||||
"""Implements the WSGI application for the Swift Object Server."""
|
||||
|
||||
@ -710,7 +723,57 @@ class ObjectController(object):
|
||||
slow = self.slow - trans_time
|
||||
if slow > 0:
|
||||
sleep(slow)
|
||||
return res(env, start_response)
|
||||
|
||||
# To be able to zero-copy send the object, we need a few things.
|
||||
# First, we have to be responding successfully to a GET, or else we're
|
||||
# not sending the object. Second, we have to be able to extract the
|
||||
# socket file descriptor from the WSGI input object. Third, the
|
||||
# diskfile has to support zero-copy send.
|
||||
#
|
||||
# There's a good chance that this could work for 206 responses too,
|
||||
# but the common case is sending the whole object, so we'll start
|
||||
# there.
|
||||
if req.method == 'GET' and res.status_int == 200 and \
|
||||
isinstance(env['wsgi.input'], wsgi.Input):
|
||||
app_iter = getattr(res, 'app_iter', None)
|
||||
checker = getattr(app_iter, 'can_zero_copy_send', None)
|
||||
if checker and checker():
|
||||
# For any kind of zero-copy thing like sendfile or splice, we
|
||||
# need the file descriptor. Eventlet doesn't provide a clean
|
||||
# way of getting that, so we resort to this.
|
||||
wsock = env['wsgi.input'].get_socket()
|
||||
wsockfd = wsock.fileno()
|
||||
|
||||
# Don't call zero_copy_send() until after we force the HTTP
|
||||
# headers out of Eventlet and into the socket.
|
||||
def zero_copy_iter():
|
||||
# If possible, set TCP_CORK so that headers don't
|
||||
# immediately go on the wire, but instead, wait for some
|
||||
# response body to make the TCP frames as large as
|
||||
# possible (and hence as few packets as possible).
|
||||
#
|
||||
# On non-Linux systems, we might consider TCP_NODELAY, but
|
||||
# since the only known zero-copy-capable diskfile uses
|
||||
# Linux-specific syscalls, we'll defer that work until
|
||||
# someone needs it.
|
||||
if hasattr(socket, 'TCP_CORK'):
|
||||
wsock.setsockopt(socket.IPPROTO_TCP,
|
||||
socket.TCP_CORK, 1)
|
||||
yield EventletPlungerString()
|
||||
try:
|
||||
app_iter.zero_copy_send(wsockfd)
|
||||
except Exception:
|
||||
self.logger.exception("zero_copy_send() blew up")
|
||||
raise
|
||||
yield ''
|
||||
|
||||
# Get headers ready to go out
|
||||
res(env, start_response)
|
||||
return zero_copy_iter()
|
||||
else:
|
||||
return res(env, start_response)
|
||||
else:
|
||||
return res(env, start_response)
|
||||
|
||||
|
||||
def global_conf_callback(preloaded_app_conf, global_conf):
|
||||
|
@ -376,7 +376,8 @@ class FakeLogger(logging.Logger):
|
||||
|
||||
def _clear(self):
|
||||
self.log_dict = defaultdict(list)
|
||||
self.lines_dict = defaultdict(list)
|
||||
self.lines_dict = {'critical': [], 'error': [], 'info': [],
|
||||
'warning': [], 'debug': []}
|
||||
|
||||
def _store_in(store_name):
|
||||
def stub_fn(self, *args, **kwargs):
|
||||
@ -390,8 +391,17 @@ class FakeLogger(logging.Logger):
|
||||
return stub_fn
|
||||
|
||||
def get_lines_for_level(self, level):
|
||||
if level not in self.lines_dict:
|
||||
raise KeyError(
|
||||
"Invalid log level '%s'; valid levels are %s" %
|
||||
(level,
|
||||
', '.join("'%s'" % lvl for lvl in sorted(self.lines_dict))))
|
||||
return self.lines_dict[level]
|
||||
|
||||
def all_log_lines(self):
|
||||
return dict((level, msgs) for level, msgs in self.lines_dict.items()
|
||||
if len(msgs) > 0)
|
||||
|
||||
error = _store_and_log_in('error', logging.ERROR)
|
||||
info = _store_and_log_in('info', logging.INFO)
|
||||
warning = _store_and_log_in('warning', logging.WARNING)
|
||||
|
@ -28,7 +28,7 @@ class TestUtils(unittest.TestCase):
|
||||
logger = FakeLogger()
|
||||
csr = ContainerSyncRealms(unique, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'debug': [
|
||||
"Could not load '%s': [Errno 2] No such file or directory: "
|
||||
"'%s'" % (unique, unique)]})
|
||||
@ -45,7 +45,7 @@ class TestUtils(unittest.TestCase):
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
try:
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Could not load '%s': [Errno 13] Permission denied: "
|
||||
"'%s'" % (fpath, fpath)]})
|
||||
@ -61,7 +61,7 @@ class TestUtils(unittest.TestCase):
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), [])
|
||||
|
||||
@ -73,7 +73,7 @@ class TestUtils(unittest.TestCase):
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Could not load '%s': File contains no section headers.\n"
|
||||
"file: %s, line: 1\n"
|
||||
@ -92,7 +92,7 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), ['US'])
|
||||
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
|
||||
@ -120,7 +120,7 @@ cluster_lon3 = http://lon3.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 60)
|
||||
self.assertEqual(sorted(csr.realms()), ['UK', 'US'])
|
||||
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
|
||||
@ -144,7 +144,7 @@ cluster_lon3 = http://lon3.host/v1/
|
||||
logger = FakeLogger()
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(logger.lines_dict, {})
|
||||
self.assertEqual(logger.all_log_lines(), {})
|
||||
self.assertEqual(csr.mtime_check_interval, 300)
|
||||
self.assertEqual(csr.realms(), ['US'])
|
||||
self.assertEqual(csr.key('US'), None)
|
||||
@ -163,7 +163,7 @@ mtime_check_interval = invalid
|
||||
fpath = os.path.join(tempdir, fname)
|
||||
csr = ContainerSyncRealms(fpath, logger)
|
||||
self.assertEqual(
|
||||
logger.lines_dict,
|
||||
logger.all_log_lines(),
|
||||
{'error': [
|
||||
"Error in '%s' with mtime_check_interval: invalid literal "
|
||||
"for int() with base 10: 'invalid'" % fpath]})
|
||||
|
@ -1503,6 +1503,9 @@ class TestUtils(unittest.TestCase):
|
||||
utils.load_libc_function('printf')))
|
||||
self.assert_(callable(
|
||||
utils.load_libc_function('some_not_real_function')))
|
||||
self.assertRaises(AttributeError,
|
||||
utils.load_libc_function, 'some_not_real_function',
|
||||
fail_if_missing=True)
|
||||
|
||||
def test_readconf(self):
|
||||
conf = '''[section1]
|
||||
|
@ -36,6 +36,7 @@ from eventlet import tpool
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
patch_policies, debug_logger)
|
||||
|
||||
from nose import SkipTest
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, Timestamp
|
||||
@ -951,6 +952,18 @@ class TestDiskFileManager(unittest.TestCase):
|
||||
lock_exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
|
||||
def test_missing_splice_warning(self):
|
||||
logger = FakeLogger()
|
||||
with mock.patch('swift.obj.diskfile.system_has_splice',
|
||||
lambda: False):
|
||||
self.conf['splice'] = 'yes'
|
||||
mgr = diskfile.DiskFileManager(self.conf, logger)
|
||||
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
self.assertTrue(len(warnings) > 0)
|
||||
self.assertTrue('splice()' in warnings[-1])
|
||||
self.assertFalse(mgr.use_splice)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
@ -2183,6 +2196,50 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def _system_can_zero_copy(self):
|
||||
if not utils.system_has_splice():
|
||||
return False
|
||||
|
||||
try:
|
||||
utils.get_md5_socket()
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def test_zero_copy_cache_dropping(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.conf['splice'] = 'on'
|
||||
self.conf['keep_cache_size'] = 16384
|
||||
self.conf['disk_chunk_size'] = 4096
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
|
||||
df = self._get_open_disk_file(fsize=16385)
|
||||
reader = df.reader()
|
||||
self.assertTrue(reader.can_zero_copy_send())
|
||||
with mock.patch("swift.obj.diskfile.drop_buffer_cache") as dbc:
|
||||
with mock.patch("swift.obj.diskfile.DROP_CACHE_WINDOW", 4095):
|
||||
with open('/dev/null', 'w') as devnull:
|
||||
reader.zero_copy_send(devnull.fileno())
|
||||
self.assertEqual(len(dbc.mock_calls), 5)
|
||||
|
||||
def test_zero_copy_turns_off_when_md5_sockets_not_supported(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.conf['splice'] = 'on'
|
||||
with mock.patch('swift.obj.diskfile.get_md5_socket') as mock_md5sock:
|
||||
mock_md5sock.side_effect = IOError(
|
||||
errno.EAFNOSUPPORT, "MD5 socket busted")
|
||||
df = self._get_open_disk_file(fsize=128)
|
||||
reader = df.reader()
|
||||
self.assertFalse(reader.can_zero_copy_send())
|
||||
|
||||
log_lines = self.df_mgr.logger.get_lines_for_level('warning')
|
||||
self.assert_('MD5 sockets' in log_lines[-1])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -33,6 +33,7 @@ import itertools
|
||||
import tempfile
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
|
||||
from eventlet.green import httplib
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
@ -4373,5 +4374,136 @@ class TestObjectServer(unittest.TestCase):
|
||||
resp.close()
|
||||
|
||||
|
||||
class TestZeroCopy(unittest.TestCase):
|
||||
"""Test the object server's zero-copy functionality"""
|
||||
|
||||
def _system_can_zero_copy(self):
|
||||
if not utils.system_has_splice():
|
||||
return False
|
||||
|
||||
try:
|
||||
utils.get_md5_socket()
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def setUp(self):
|
||||
if not self._system_can_zero_copy():
|
||||
raise SkipTest("zero-copy support is missing")
|
||||
|
||||
self.testdir = mkdtemp(suffix="obj_server_zero_copy")
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
conf = {'devices': self.testdir,
|
||||
'mount_check': 'false',
|
||||
'splice': 'yes',
|
||||
'disk_chunk_size': '4096'}
|
||||
self.object_controller = object_server.ObjectController(
|
||||
conf, logger=debug_logger())
|
||||
self.df_mgr = diskfile.DiskFileManager(
|
||||
conf, self.object_controller.logger)
|
||||
|
||||
listener = listen(('localhost', 0))
|
||||
port = listener.getsockname()[1]
|
||||
self.wsgi_greenlet = spawn(
|
||||
wsgi.server, listener, self.object_controller, NullLogger())
|
||||
|
||||
self.http_conn = httplib.HTTPConnection('localhost', port)
|
||||
self.http_conn.connect()
|
||||
|
||||
def tearDown(self):
|
||||
"""Tear down for testing swift.object.server.ObjectController"""
|
||||
self.wsgi_greenlet.kill()
|
||||
rmtree(self.testdir)
|
||||
|
||||
def test_GET(self):
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
|
||||
self.http_conn.request('PUT', url_path, 'obj contents',
|
||||
{'X-Timestamp': '127082564.24709'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, 'obj contents')
|
||||
|
||||
def test_GET_big(self):
|
||||
# Test with a large-ish object to make sure we handle full socket
|
||||
# buffers correctly.
|
||||
obj_contents = 'A' * 4 * 1024 * 1024 # 4 MiB
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
|
||||
self.http_conn.request('PUT', url_path, obj_contents,
|
||||
{'X-Timestamp': '1402600322.52126'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, obj_contents)
|
||||
|
||||
def test_quarantine(self):
|
||||
obj_hash = hash_path('a', 'c', 'o')
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
ts = '1402601849.47475'
|
||||
|
||||
self.http_conn.request('PUT', url_path, 'obj contents',
|
||||
{'X-Timestamp': ts})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
# go goof up the file on disk
|
||||
fname = os.path.join(self.testdir, 'sda1', 'objects', '2100',
|
||||
obj_hash[-3:], obj_hash, ts + '.data')
|
||||
|
||||
with open(fname, 'rb+') as fh:
|
||||
fh.write('XYZ')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, 'XYZ contents')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
# it was quarantined by the previous request
|
||||
self.assertEqual(response.status, 404)
|
||||
response.read()
|
||||
|
||||
def test_quarantine_on_well_formed_zero_byte_file(self):
|
||||
# Make sure we work around an oddity in Linux's hash sockets
|
||||
url_path = '/sda1/2100/a/c/o'
|
||||
ts = '1402700497.71333'
|
||||
|
||||
self.http_conn.request(
|
||||
'PUT', url_path, '',
|
||||
{'X-Timestamp': ts, 'Content-Length': '0'})
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 201)
|
||||
response.read()
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200)
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, '')
|
||||
|
||||
self.http_conn.request('GET', url_path)
|
||||
response = self.http_conn.getresponse()
|
||||
self.assertEqual(response.status, 200) # still there
|
||||
contents = response.read()
|
||||
self.assertEqual(contents, '')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user