Rework splice and tee

This patch reworks the bindings to `splice` and `tee` in order to fix a
(potential) bug when using the old `splice` binding and passing offsets
(see https://review.openstack.org/#/c/135319/ for a discussion of the
issue).

The new binding code (based on https://github.com/NicolasT/tee-n-splice)
uses more `ctypes` features w.r.t. parameter and return value handling.

It also introduces a test suite for both calls.

Change-Id: Ib8084ca20fe7a199a00067da9386c2ccf618755c
This commit is contained in:
Nicolas Trangez 2014-11-18 15:57:06 +01:00
parent 477ec4133b
commit 2a0a8ae00f
6 changed files with 454 additions and 84 deletions

197
swift/common/splice.py Normal file
View File

@ -0,0 +1,197 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Bindings to the `tee` and `splice` system calls
'''
import os
import ctypes
import ctypes.util
__all__ = ['tee', 'splice']
c_loff_t = ctypes.c_long
class Tee(object):
'''Binding to `tee`'''
__slots__ = '_c_tee',
def __init__(self):
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
try:
c_tee = libc.tee
except AttributeError:
self._c_tee = None
return
c_tee.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_size_t,
ctypes.c_uint
]
c_tee.restype = ctypes.c_ssize_t
def errcheck(result, func, arguments):
if result == -1:
errno = ctypes.set_errno(0)
raise IOError(errno, 'tee: %s' % os.strerror(errno))
else:
return result
c_tee.errcheck = errcheck
self._c_tee = c_tee
def __call__(self, fd_in, fd_out, len_, flags):
'''See `man 2 tee`
File-descriptors can be file-like objects with a `fileno` method, or
integers.
Flags can be an integer value, or a list of flags (exposed on
`splice`).
This function returns the number of bytes transferred (i.e. the actual
result of the call to `tee`).
Upon other errors, an `IOError` is raised with the proper `errno` set.
'''
if not self.available:
raise EnvironmentError('tee not available')
if not isinstance(flags, (int, long)):
c_flags = reduce(lambda a, b: a | b, flags, 0)
else:
c_flags = flags
c_fd_in = getattr(fd_in, 'fileno', lambda: fd_in)()
c_fd_out = getattr(fd_out, 'fileno', lambda: fd_out)()
return self._c_tee(c_fd_in, c_fd_out, len_, c_flags)
@property
def available(self):
'''Availability of `tee`'''
return self._c_tee is not None
tee = Tee()
del Tee
class Splice(object):
'''Binding to `splice`'''
# From `bits/fcntl-linux.h`
SPLICE_F_MOVE = 1
SPLICE_F_NONBLOCK = 2
SPLICE_F_MORE = 4
SPLICE_F_GIFT = 8
__slots__ = '_c_splice',
def __init__(self):
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
try:
c_splice = libc.splice
except AttributeError:
self._c_splice = None
return
c_loff_t_p = ctypes.POINTER(c_loff_t)
c_splice.argtypes = [
ctypes.c_int, c_loff_t_p,
ctypes.c_int, c_loff_t_p,
ctypes.c_size_t,
ctypes.c_uint
]
c_splice.restype = ctypes.c_ssize_t
def errcheck(result, func, arguments):
if result == -1:
errno = ctypes.set_errno(0)
raise IOError(errno, 'splice: %s' % os.strerror(errno))
else:
off_in = arguments[1]
off_out = arguments[3]
return (
result,
off_in.contents.value if off_in is not None else None,
off_out.contents.value if off_out is not None else None)
c_splice.errcheck = errcheck
self._c_splice = c_splice
def __call__(self, fd_in, off_in, fd_out, off_out, len_, flags):
'''See `man 2 splice`
File-descriptors can be file-like objects with a `fileno` method, or
integers.
Flags can be an integer value, or a list of flags (exposed on this
object).
Returns a tuple of the result of the `splice` call, the output value of
`off_in` and the output value of `off_out` (or `None` for any of these
output values, if applicable).
Upon other errors, an `IOError` is raised with the proper `errno` set.
Note: if you want to pass `NULL` as value for `off_in` or `off_out` to
the system call, you must pass `None`, *not* 0!
'''
if not self.available:
raise EnvironmentError('splice not available')
if not isinstance(flags, (int, long)):
c_flags = reduce(lambda a, b: a | b, flags, 0)
else:
c_flags = flags
c_fd_in = getattr(fd_in, 'fileno', lambda: fd_in)()
c_fd_out = getattr(fd_out, 'fileno', lambda: fd_out)()
c_off_in = \
ctypes.pointer(c_loff_t(off_in)) if off_in is not None else None
c_off_out = \
ctypes.pointer(c_loff_t(off_out)) if off_out is not None else None
return self._c_splice(
c_fd_in, c_off_in, c_fd_out, c_off_out, len_, c_flags)
@property
def available(self):
'''Availability of `splice`'''
return self._c_splice is not None
splice = Splice()
del Splice

View File

@ -87,8 +87,6 @@ _posix_fadvise = None
_libc_socket = None _libc_socket = None
_libc_bind = None _libc_bind = None
_libc_accept = None _libc_accept = None
_libc_splice = None
_libc_tee = None
# If set to non-zero, fallocate routines will fail based on free space # If set to non-zero, fallocate routines will fail based on free space
# available being at or below this amount, in bytes. # available being at or below this amount, in bytes.
@ -3215,65 +3213,3 @@ def get_md5_socket():
raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket")
return md5_sockfd return md5_sockfd
# Flags for splice() and tee()
SPLICE_F_MOVE = 1
SPLICE_F_NONBLOCK = 2
SPLICE_F_MORE = 4
SPLICE_F_GIFT = 8
def splice(fd_in, off_in, fd_out, off_out, length, flags):
"""
Calls splice - a Linux-specific syscall for zero-copy data movement.
On success, returns the number of bytes moved.
On failure where errno is EWOULDBLOCK, returns None.
On all other failures, raises IOError.
"""
global _libc_splice
if _libc_splice is None:
_libc_splice = load_libc_function('splice', fail_if_missing=True)
ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in),
ctypes.c_int(fd_out), ctypes.c_long(off_out),
ctypes.c_int(length), ctypes.c_int(flags))
if ret < 0:
err = ctypes.get_errno()
if err == errno.EWOULDBLOCK:
return None
else:
raise IOError(err, "splice() failed: %s" % os.strerror(err))
return ret
def tee(fd_in, fd_out, length, flags):
"""
Calls tee - a Linux-specific syscall to let pipes share data.
On success, returns the number of bytes "copied".
On failure, raises IOError.
"""
global _libc_tee
if _libc_tee is None:
_libc_tee = load_libc_function('tee', fail_if_missing=True)
ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out),
ctypes.c_int(length), ctypes.c_int(flags))
if ret < 0:
err = ctypes.get_errno()
raise IOError(err, "tee() failed: %s" % os.strerror(err))
return ret
def system_has_splice():
global _libc_splice
try:
_libc_splice = load_libc_function('splice', fail_if_missing=True)
return True
except AttributeError:
return False

View File

@ -56,8 +56,8 @@ from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \ storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
config_true_value, listdir, split_path, ismount, remove_file, \ config_true_value, listdir, split_path, ismount, remove_file, \
get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \ get_md5_socket, F_SETPIPE_SZ
F_SETPIPE_SZ from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
@ -544,17 +544,15 @@ class DiskFileManager(object):
self.use_splice = False self.use_splice = False
self.pipe_size = None self.pipe_size = None
splice_available = system_has_splice()
conf_wants_splice = config_true_value(conf.get('splice', 'no')) conf_wants_splice = config_true_value(conf.get('splice', 'no'))
# If the operator wants zero-copy with splice() but we don't have the # If the operator wants zero-copy with splice() but we don't have the
# requisite kernel support, complain so they can go fix it. # requisite kernel support, complain so they can go fix it.
if conf_wants_splice and not splice_available: if conf_wants_splice and not splice.available:
self.logger.warn( self.logger.warn(
"Use of splice() requested (config says \"splice = %s\"), " "Use of splice() requested (config says \"splice = %s\"), "
"but the system does not support it. " "but the system does not support it. "
"splice() will not be used." % conf.get('splice')) "splice() will not be used." % conf.get('splice'))
elif conf_wants_splice and splice_available: elif conf_wants_splice and splice.available:
try: try:
sockfd = get_md5_socket() sockfd = get_md5_socket()
os.close(sockfd) os.close(sockfd)
@ -1006,8 +1004,8 @@ class DiskFileReader(object):
try: try:
while True: while True:
# Read data from disk to pipe # Read data from disk to pipe
bytes_in_pipe = self._threadpool.run_in_thread( (bytes_in_pipe, _1, _2) = self._threadpool.run_in_thread(
splice, rfd, 0, client_wpipe, 0, pipe_size, 0) splice, rfd, None, client_wpipe, None, pipe_size, 0)
if bytes_in_pipe == 0: if bytes_in_pipe == 0:
self._read_to_eof = True self._read_to_eof = True
self._drop_cache(rfd, dropped_cache, self._drop_cache(rfd, dropped_cache,
@ -1038,20 +1036,23 @@ class DiskFileReader(object):
# bytes in it, so read won't block, and we're splicing it into # bytes in it, so read won't block, and we're splicing it into
# an MD5 socket, which synchronously hashes any data sent to # an MD5 socket, which synchronously hashes any data sent to
# it, so writing won't block either. # it, so writing won't block either.
hashed = splice(hash_rpipe, 0, md5_sockfd, 0, (hashed, _1, _2) = splice(hash_rpipe, None, md5_sockfd, None,
bytes_in_pipe, SPLICE_F_MORE) bytes_in_pipe, splice.SPLICE_F_MORE)
if hashed != bytes_in_pipe: if hashed != bytes_in_pipe:
raise Exception("md5 socket didn't take all the data? " raise Exception("md5 socket didn't take all the data? "
"(tried to write %d, but wrote %d)" % "(tried to write %d, but wrote %d)" %
(bytes_in_pipe, hashed)) (bytes_in_pipe, hashed))
while bytes_in_pipe > 0: while bytes_in_pipe > 0:
sent = splice(client_rpipe, 0, wsockfd, 0, try:
res = splice(client_rpipe, None, wsockfd, None,
bytes_in_pipe, 0) bytes_in_pipe, 0)
if sent is None: # would have blocked bytes_in_pipe -= res[0]
except IOError as exc:
if exc.errno == errno.EWOULDBLOCK:
trampoline(wsockfd, write=True) trampoline(wsockfd, write=True)
else: else:
bytes_in_pipe -= sent raise
if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW: if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
self._drop_cache(rfd, dropped_cache, self._drop_cache(rfd, dropped_cache,

View File

@ -0,0 +1,235 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''Tests for `swift.common.splice`'''
import os
import errno
import ctypes
import logging
import tempfile
import unittest
import contextlib
from mock import patch
from swift.common.splice import splice, tee
LOGGER = logging.getLogger(__name__)
def safe_close(fd):
'''Close a file descriptor, ignoring any exceptions'''
try:
os.close(fd)
except Exception:
LOGGER.exception('Error while closing FD')
@contextlib.contextmanager
def pipe():
'''Context-manager providing 2 ends of a pipe, closing them at exit'''
fds = os.pipe()
try:
yield fds
finally:
safe_close(fds[0])
safe_close(fds[1])
@unittest.skipUnless(splice.available, 'splice not available')
class TestSplice(unittest.TestCase):
'''Tests for `splice`'''
def test_flags(self):
'''Test flag attribute availability'''
self.assert_(hasattr(splice, 'SPLICE_F_MOVE'))
self.assert_(hasattr(splice, 'SPLICE_F_NONBLOCK'))
self.assert_(hasattr(splice, 'SPLICE_F_MORE'))
self.assert_(hasattr(splice, 'SPLICE_F_GIFT'))
@patch('swift.common.splice.splice._c_splice', None)
def test_available(self):
'''Test `available` attribute correctness'''
self.assertFalse(splice.available)
def test_splice_pipe_to_pipe(self):
'''Test `splice` from a pipe to a pipe'''
with pipe() as (p1a, p1b), pipe() as (p2a, p2b):
os.write(p1b, 'abcdef')
res = splice(p1a, None, p2b, None, 3, 0)
self.assertEqual(res, (3, None, None))
self.assertEqual(os.read(p2a, 3), 'abc')
self.assertEqual(os.read(p1a, 3), 'def')
def test_splice_file_to_pipe(self):
'''Test `splice` from a file to a pipe'''
with tempfile.NamedTemporaryFile(bufsize=0) as fd, pipe() as (pa, pb):
fd.write('abcdef')
fd.seek(0, os.SEEK_SET)
res = splice(fd, None, pb, None, 3, 0)
self.assertEqual(res, (3, None, None))
# `fd.tell()` isn't updated...
self.assertEqual(os.lseek(fd.fileno(), 0, os.SEEK_CUR), 3)
fd.seek(0, os.SEEK_SET)
res = splice(fd, 3, pb, None, 3, 0)
self.assertEqual(res, (3, 6, None))
self.assertEqual(os.lseek(fd.fileno(), 0, os.SEEK_CUR), 0)
self.assertEquals(os.read(pa, 6), 'abcdef')
def test_splice_pipe_to_file(self):
'''Test `splice` from a pipe to a file'''
with tempfile.NamedTemporaryFile(bufsize=0) as fd, pipe() as (pa, pb):
os.write(pb, 'abcdef')
res = splice(pa, None, fd, None, 3, 0)
self.assertEqual(res, (3, None, None))
self.assertEqual(fd.tell(), 3)
fd.seek(0, os.SEEK_SET)
res = splice(pa, None, fd, 3, 3, 0)
self.assertEqual(res, (3, None, 6))
self.assertEqual(fd.tell(), 0)
self.assertEqual(fd.read(6), 'abcdef')
@patch.object(splice, '_c_splice')
def test_fileno(self, mock_splice):
'''Test handling of file-descriptors'''
splice(1, None, 2, None, 3, 0)
self.assertEqual(mock_splice.call_args,
((1, None, 2, None, 3, 0), {}))
mock_splice.reset_mock()
with open('/dev/zero', 'r') as fd:
splice(fd, None, fd, None, 3, 0)
self.assertEqual(mock_splice.call_args,
((fd.fileno(), None, fd.fileno(), None, 3, 0),
{}))
@patch.object(splice, '_c_splice')
def test_flags_list(self, mock_splice):
'''Test handling of flag lists'''
splice(1, None, 2, None, 3,
[splice.SPLICE_F_MOVE, splice.SPLICE_F_NONBLOCK])
flags = splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK
self.assertEqual(mock_splice.call_args,
((1, None, 2, None, 3, flags), {}))
mock_splice.reset_mock()
splice(1, None, 2, None, 3, [])
self.assertEqual(mock_splice.call_args,
((1, None, 2, None, 3, 0), {}))
def test_errno(self):
'''Test handling of failures'''
# Invoke EBADF by using a read-only FD as fd_out
with open('/dev/null', 'r') as fd:
err = errno.EBADF
msg = r'\[Errno %d\] splice: %s' % (err, os.strerror(err))
self.assertRaisesRegexp(IOError, msg, splice, fd, None, fd, None,
3, 0)
self.assertEqual(ctypes.get_errno(), 0)
@patch('swift.common.splice.splice._c_splice', None)
def test_unavailable(self):
'''Test exception when unavailable'''
self.assertRaises(EnvironmentError, splice, 1, None, 2, None, 2, 0)
@unittest.skipUnless(tee.available, 'tee not available')
class TestTee(unittest.TestCase):
'''Tests for `tee`'''
@patch('swift.common.splice.tee._c_tee', None)
def test_available(self):
'''Test `available` attribute correctness'''
self.assertFalse(tee.available)
def test_tee_pipe_to_pipe(self):
'''Test `tee` from a pipe to a pipe'''
with pipe() as (p1a, p1b), pipe() as (p2a, p2b):
os.write(p1b, 'abcdef')
res = tee(p1a, p2b, 3, 0)
self.assertEqual(res, 3)
self.assertEqual(os.read(p2a, 3), 'abc')
self.assertEqual(os.read(p1a, 6), 'abcdef')
@patch.object(tee, '_c_tee')
def test_fileno(self, mock_tee):
'''Test handling of file-descriptors'''
with pipe() as (pa, pb):
tee(pa, pb, 3, 0)
self.assertEqual(mock_tee.call_args, ((pa, pb, 3, 0), {}))
mock_tee.reset_mock()
tee(os.fdopen(pa, 'r'), os.fdopen(pb, 'w'), 3, 0)
self.assertEqual(mock_tee.call_args, ((pa, pb, 3, 0), {}))
@patch.object(tee, '_c_tee')
def test_flags_list(self, mock_tee):
'''Test handling of flag lists'''
tee(1, 2, 3, [splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK])
flags = splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK
self.assertEqual(mock_tee.call_args, ((1, 2, 3, flags), {}))
mock_tee.reset_mock()
tee(1, 2, 3, [])
self.assertEqual(mock_tee.call_args, ((1, 2, 3, 0), {}))
def test_errno(self):
'''Test handling of failures'''
# Invoke EBADF by using a read-only FD as fd_out
with open('/dev/null', 'r') as fd:
err = errno.EBADF
msg = r'\[Errno %d\] tee: %s' % (err, os.strerror(err))
self.assertRaisesRegexp(IOError, msg, tee, fd, fd, 3, 0)
self.assertEqual(ctypes.get_errno(), 0)
@patch('swift.common.splice.tee._c_tee', None)
def test_unavailable(self):
'''Test exception when unavailable'''
self.assertRaises(EnvironmentError, tee, 1, 2, 2, 0)

View File

@ -41,6 +41,7 @@ from swift.obj import diskfile
from swift.common import utils from swift.common import utils
from swift.common.utils import hash_path, mkdirs, Timestamp from swift.common.utils import hash_path, mkdirs, Timestamp
from swift.common import ring from swift.common import ring
from swift.common.splice import splice
from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \
DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \ DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \
@ -954,8 +955,7 @@ class TestDiskFileManager(unittest.TestCase):
def test_missing_splice_warning(self): def test_missing_splice_warning(self):
logger = FakeLogger() logger = FakeLogger()
with mock.patch('swift.obj.diskfile.system_has_splice', with mock.patch('swift.common.splice.splice._c_splice', None):
lambda: False):
self.conf['splice'] = 'yes' self.conf['splice'] = 'yes'
mgr = diskfile.DiskFileManager(self.conf, logger) mgr = diskfile.DiskFileManager(self.conf, logger)
@ -2242,7 +2242,7 @@ class TestDiskFile(unittest.TestCase):
self.assertTrue(exp_name in set(dl)) self.assertTrue(exp_name in set(dl))
def _system_can_zero_copy(self): def _system_can_zero_copy(self):
if not utils.system_has_splice(): if not splice.available:
return False return False
try: try:

View File

@ -47,6 +47,7 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory, public, replication NullLogger, storage_directory, public, replication
from swift.common import constraints from swift.common import constraints
from swift.common.swob import Request, HeaderKeyDict from swift.common.swob import Request, HeaderKeyDict
from swift.common.splice import splice
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeviceUnavailable from swift.common.exceptions import DiskFileDeviceUnavailable
@ -4415,7 +4416,7 @@ class TestZeroCopy(unittest.TestCase):
"""Test the object server's zero-copy functionality""" """Test the object server's zero-copy functionality"""
def _system_can_zero_copy(self): def _system_can_zero_copy(self):
if not utils.system_has_splice(): if not splice.available:
return False return False
try: try: