From 2a0a8ae00f2d3b7db255b0905b063e930f824f3d Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Tue, 18 Nov 2014 15:57:06 +0100 Subject: [PATCH] Rework `splice` and `tee` This patch reworks the bindings to `splice` and `tee` in order to fix a (potential) bug when using the old `splice` binding and passing offsets (see https://review.openstack.org/#/c/135319/ for a discussion of the issue). The new binding code (based on https://github.com/NicolasT/tee-n-splice) uses more `ctypes` features w.r.t. parameter and return value handling. It also introduces a test suite for both calls. Change-Id: Ib8084ca20fe7a199a00067da9386c2ccf618755c --- swift/common/splice.py | 197 ++++++++++++++++++++++++++ swift/common/utils.py | 64 --------- swift/obj/diskfile.py | 33 ++--- test/unit/common/test_splice.py | 235 ++++++++++++++++++++++++++++++++ test/unit/obj/test_diskfile.py | 6 +- test/unit/obj/test_server.py | 3 +- 6 files changed, 454 insertions(+), 84 deletions(-) create mode 100644 swift/common/splice.py create mode 100644 test/unit/common/test_splice.py diff --git a/swift/common/splice.py b/swift/common/splice.py new file mode 100644 index 0000000000..581e2df48d --- /dev/null +++ b/swift/common/splice.py @@ -0,0 +1,197 @@ +# Copyright (c) 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' +Bindings to the `tee` and `splice` system calls +''' + +import os +import ctypes +import ctypes.util + +__all__ = ['tee', 'splice'] + + +c_loff_t = ctypes.c_long + + +class Tee(object): + '''Binding to `tee`''' + + __slots__ = '_c_tee', + + def __init__(self): + libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) + + try: + c_tee = libc.tee + except AttributeError: + self._c_tee = None + return + + c_tee.argtypes = [ + ctypes.c_int, + ctypes.c_int, + ctypes.c_size_t, + ctypes.c_uint + ] + + c_tee.restype = ctypes.c_ssize_t + + def errcheck(result, func, arguments): + if result == -1: + errno = ctypes.set_errno(0) + + raise IOError(errno, 'tee: %s' % os.strerror(errno)) + else: + return result + + c_tee.errcheck = errcheck + + self._c_tee = c_tee + + def __call__(self, fd_in, fd_out, len_, flags): + '''See `man 2 tee` + + File-descriptors can be file-like objects with a `fileno` method, or + integers. + + Flags can be an integer value, or a list of flags (exposed on + `splice`). + + This function returns the number of bytes transferred (i.e. the actual + result of the call to `tee`). + + Upon other errors, an `IOError` is raised with the proper `errno` set. + ''' + + if not self.available: + raise EnvironmentError('tee not available') + + if not isinstance(flags, (int, long)): + c_flags = reduce(lambda a, b: a | b, flags, 0) + else: + c_flags = flags + + c_fd_in = getattr(fd_in, 'fileno', lambda: fd_in)() + c_fd_out = getattr(fd_out, 'fileno', lambda: fd_out)() + + return self._c_tee(c_fd_in, c_fd_out, len_, c_flags) + + @property + def available(self): + '''Availability of `tee`''' + + return self._c_tee is not None + +tee = Tee() +del Tee + + +class Splice(object): + '''Binding to `splice`''' + + # From `bits/fcntl-linux.h` + SPLICE_F_MOVE = 1 + SPLICE_F_NONBLOCK = 2 + SPLICE_F_MORE = 4 + SPLICE_F_GIFT = 8 + + __slots__ = '_c_splice', + + def __init__(self): + libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) + + try: + c_splice = libc.splice + except AttributeError: + self._c_splice = None + return + + c_loff_t_p = ctypes.POINTER(c_loff_t) + + c_splice.argtypes = [ + ctypes.c_int, c_loff_t_p, + ctypes.c_int, c_loff_t_p, + ctypes.c_size_t, + ctypes.c_uint + ] + + c_splice.restype = ctypes.c_ssize_t + + def errcheck(result, func, arguments): + if result == -1: + errno = ctypes.set_errno(0) + + raise IOError(errno, 'splice: %s' % os.strerror(errno)) + else: + off_in = arguments[1] + off_out = arguments[3] + + return ( + result, + off_in.contents.value if off_in is not None else None, + off_out.contents.value if off_out is not None else None) + + c_splice.errcheck = errcheck + + self._c_splice = c_splice + + def __call__(self, fd_in, off_in, fd_out, off_out, len_, flags): + '''See `man 2 splice` + + File-descriptors can be file-like objects with a `fileno` method, or + integers. + + Flags can be an integer value, or a list of flags (exposed on this + object). + + Returns a tuple of the result of the `splice` call, the output value of + `off_in` and the output value of `off_out` (or `None` for any of these + output values, if applicable). + + Upon other errors, an `IOError` is raised with the proper `errno` set. + + Note: if you want to pass `NULL` as value for `off_in` or `off_out` to + the system call, you must pass `None`, *not* 0! + ''' + + if not self.available: + raise EnvironmentError('splice not available') + + if not isinstance(flags, (int, long)): + c_flags = reduce(lambda a, b: a | b, flags, 0) + else: + c_flags = flags + + c_fd_in = getattr(fd_in, 'fileno', lambda: fd_in)() + c_fd_out = getattr(fd_out, 'fileno', lambda: fd_out)() + + c_off_in = \ + ctypes.pointer(c_loff_t(off_in)) if off_in is not None else None + c_off_out = \ + ctypes.pointer(c_loff_t(off_out)) if off_out is not None else None + + return self._c_splice( + c_fd_in, c_off_in, c_fd_out, c_off_out, len_, c_flags) + + @property + def available(self): + '''Availability of `splice`''' + + return self._c_splice is not None + +splice = Splice() +del Splice diff --git a/swift/common/utils.py b/swift/common/utils.py index 239d7f0675..b10294e7be 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -87,8 +87,6 @@ _posix_fadvise = None _libc_socket = None _libc_bind = None _libc_accept = None -_libc_splice = None -_libc_tee = None # If set to non-zero, fallocate routines will fail based on free space # available being at or below this amount, in bytes. @@ -3215,65 +3213,3 @@ def get_md5_socket(): raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") return md5_sockfd - - -# Flags for splice() and tee() -SPLICE_F_MOVE = 1 -SPLICE_F_NONBLOCK = 2 -SPLICE_F_MORE = 4 -SPLICE_F_GIFT = 8 - - -def splice(fd_in, off_in, fd_out, off_out, length, flags): - """ - Calls splice - a Linux-specific syscall for zero-copy data movement. - - On success, returns the number of bytes moved. - - On failure where errno is EWOULDBLOCK, returns None. - - On all other failures, raises IOError. - """ - global _libc_splice - if _libc_splice is None: - _libc_splice = load_libc_function('splice', fail_if_missing=True) - - ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in), - ctypes.c_int(fd_out), ctypes.c_long(off_out), - ctypes.c_int(length), ctypes.c_int(flags)) - if ret < 0: - err = ctypes.get_errno() - if err == errno.EWOULDBLOCK: - return None - else: - raise IOError(err, "splice() failed: %s" % os.strerror(err)) - return ret - - -def tee(fd_in, fd_out, length, flags): - """ - Calls tee - a Linux-specific syscall to let pipes share data. - - On success, returns the number of bytes "copied". - - On failure, raises IOError. - """ - global _libc_tee - if _libc_tee is None: - _libc_tee = load_libc_function('tee', fail_if_missing=True) - - ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out), - ctypes.c_int(length), ctypes.c_int(flags)) - if ret < 0: - err = ctypes.get_errno() - raise IOError(err, "tee() failed: %s" % os.strerror(err)) - return ret - - -def system_has_splice(): - global _libc_splice - try: - _libc_splice = load_libc_function('splice', fail_if_missing=True) - return True - except AttributeError: - return False diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index f828a16d02..6eef04aa3d 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -56,8 +56,8 @@ from swift.common.utils import mkdirs, Timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ config_true_value, listdir, split_path, ismount, remove_file, \ - get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \ - F_SETPIPE_SZ + get_md5_socket, F_SETPIPE_SZ +from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ @@ -544,17 +544,15 @@ class DiskFileManager(object): self.use_splice = False self.pipe_size = None - splice_available = system_has_splice() - conf_wants_splice = config_true_value(conf.get('splice', 'no')) # If the operator wants zero-copy with splice() but we don't have the # requisite kernel support, complain so they can go fix it. - if conf_wants_splice and not splice_available: + if conf_wants_splice and not splice.available: self.logger.warn( "Use of splice() requested (config says \"splice = %s\"), " "but the system does not support it. " "splice() will not be used." % conf.get('splice')) - elif conf_wants_splice and splice_available: + elif conf_wants_splice and splice.available: try: sockfd = get_md5_socket() os.close(sockfd) @@ -1006,8 +1004,8 @@ class DiskFileReader(object): try: while True: # Read data from disk to pipe - bytes_in_pipe = self._threadpool.run_in_thread( - splice, rfd, 0, client_wpipe, 0, pipe_size, 0) + (bytes_in_pipe, _1, _2) = self._threadpool.run_in_thread( + splice, rfd, None, client_wpipe, None, pipe_size, 0) if bytes_in_pipe == 0: self._read_to_eof = True self._drop_cache(rfd, dropped_cache, @@ -1038,20 +1036,23 @@ class DiskFileReader(object): # bytes in it, so read won't block, and we're splicing it into # an MD5 socket, which synchronously hashes any data sent to # it, so writing won't block either. - hashed = splice(hash_rpipe, 0, md5_sockfd, 0, - bytes_in_pipe, SPLICE_F_MORE) + (hashed, _1, _2) = splice(hash_rpipe, None, md5_sockfd, None, + bytes_in_pipe, splice.SPLICE_F_MORE) if hashed != bytes_in_pipe: raise Exception("md5 socket didn't take all the data? " "(tried to write %d, but wrote %d)" % (bytes_in_pipe, hashed)) while bytes_in_pipe > 0: - sent = splice(client_rpipe, 0, wsockfd, 0, - bytes_in_pipe, 0) - if sent is None: # would have blocked - trampoline(wsockfd, write=True) - else: - bytes_in_pipe -= sent + try: + res = splice(client_rpipe, None, wsockfd, None, + bytes_in_pipe, 0) + bytes_in_pipe -= res[0] + except IOError as exc: + if exc.errno == errno.EWOULDBLOCK: + trampoline(wsockfd, write=True) + else: + raise if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW: self._drop_cache(rfd, dropped_cache, diff --git a/test/unit/common/test_splice.py b/test/unit/common/test_splice.py new file mode 100644 index 0000000000..72c3bff5e5 --- /dev/null +++ b/test/unit/common/test_splice.py @@ -0,0 +1,235 @@ +# Copyright (c) 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Tests for `swift.common.splice`''' + +import os +import errno +import ctypes +import logging +import tempfile +import unittest +import contextlib + +from mock import patch + +from swift.common.splice import splice, tee + +LOGGER = logging.getLogger(__name__) + + +def safe_close(fd): + '''Close a file descriptor, ignoring any exceptions''' + + try: + os.close(fd) + except Exception: + LOGGER.exception('Error while closing FD') + + +@contextlib.contextmanager +def pipe(): + '''Context-manager providing 2 ends of a pipe, closing them at exit''' + + fds = os.pipe() + + try: + yield fds + finally: + safe_close(fds[0]) + safe_close(fds[1]) + + +@unittest.skipUnless(splice.available, 'splice not available') +class TestSplice(unittest.TestCase): + '''Tests for `splice`''' + + def test_flags(self): + '''Test flag attribute availability''' + + self.assert_(hasattr(splice, 'SPLICE_F_MOVE')) + self.assert_(hasattr(splice, 'SPLICE_F_NONBLOCK')) + self.assert_(hasattr(splice, 'SPLICE_F_MORE')) + self.assert_(hasattr(splice, 'SPLICE_F_GIFT')) + + @patch('swift.common.splice.splice._c_splice', None) + def test_available(self): + '''Test `available` attribute correctness''' + + self.assertFalse(splice.available) + + def test_splice_pipe_to_pipe(self): + '''Test `splice` from a pipe to a pipe''' + + with pipe() as (p1a, p1b), pipe() as (p2a, p2b): + os.write(p1b, 'abcdef') + res = splice(p1a, None, p2b, None, 3, 0) + self.assertEqual(res, (3, None, None)) + self.assertEqual(os.read(p2a, 3), 'abc') + self.assertEqual(os.read(p1a, 3), 'def') + + def test_splice_file_to_pipe(self): + '''Test `splice` from a file to a pipe''' + + with tempfile.NamedTemporaryFile(bufsize=0) as fd, pipe() as (pa, pb): + fd.write('abcdef') + fd.seek(0, os.SEEK_SET) + + res = splice(fd, None, pb, None, 3, 0) + self.assertEqual(res, (3, None, None)) + # `fd.tell()` isn't updated... + self.assertEqual(os.lseek(fd.fileno(), 0, os.SEEK_CUR), 3) + + fd.seek(0, os.SEEK_SET) + res = splice(fd, 3, pb, None, 3, 0) + self.assertEqual(res, (3, 6, None)) + self.assertEqual(os.lseek(fd.fileno(), 0, os.SEEK_CUR), 0) + + self.assertEquals(os.read(pa, 6), 'abcdef') + + def test_splice_pipe_to_file(self): + '''Test `splice` from a pipe to a file''' + + with tempfile.NamedTemporaryFile(bufsize=0) as fd, pipe() as (pa, pb): + os.write(pb, 'abcdef') + + res = splice(pa, None, fd, None, 3, 0) + self.assertEqual(res, (3, None, None)) + self.assertEqual(fd.tell(), 3) + + fd.seek(0, os.SEEK_SET) + + res = splice(pa, None, fd, 3, 3, 0) + self.assertEqual(res, (3, None, 6)) + self.assertEqual(fd.tell(), 0) + + self.assertEqual(fd.read(6), 'abcdef') + + @patch.object(splice, '_c_splice') + def test_fileno(self, mock_splice): + '''Test handling of file-descriptors''' + + splice(1, None, 2, None, 3, 0) + self.assertEqual(mock_splice.call_args, + ((1, None, 2, None, 3, 0), {})) + + mock_splice.reset_mock() + + with open('/dev/zero', 'r') as fd: + splice(fd, None, fd, None, 3, 0) + self.assertEqual(mock_splice.call_args, + ((fd.fileno(), None, fd.fileno(), None, 3, 0), + {})) + + @patch.object(splice, '_c_splice') + def test_flags_list(self, mock_splice): + '''Test handling of flag lists''' + + splice(1, None, 2, None, 3, + [splice.SPLICE_F_MOVE, splice.SPLICE_F_NONBLOCK]) + + flags = splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK + self.assertEqual(mock_splice.call_args, + ((1, None, 2, None, 3, flags), {})) + + mock_splice.reset_mock() + + splice(1, None, 2, None, 3, []) + self.assertEqual(mock_splice.call_args, + ((1, None, 2, None, 3, 0), {})) + + def test_errno(self): + '''Test handling of failures''' + + # Invoke EBADF by using a read-only FD as fd_out + with open('/dev/null', 'r') as fd: + err = errno.EBADF + msg = r'\[Errno %d\] splice: %s' % (err, os.strerror(err)) + + self.assertRaisesRegexp(IOError, msg, splice, fd, None, fd, None, + 3, 0) + + self.assertEqual(ctypes.get_errno(), 0) + + @patch('swift.common.splice.splice._c_splice', None) + def test_unavailable(self): + '''Test exception when unavailable''' + + self.assertRaises(EnvironmentError, splice, 1, None, 2, None, 2, 0) + + +@unittest.skipUnless(tee.available, 'tee not available') +class TestTee(unittest.TestCase): + '''Tests for `tee`''' + + @patch('swift.common.splice.tee._c_tee', None) + def test_available(self): + '''Test `available` attribute correctness''' + + self.assertFalse(tee.available) + + def test_tee_pipe_to_pipe(self): + '''Test `tee` from a pipe to a pipe''' + + with pipe() as (p1a, p1b), pipe() as (p2a, p2b): + os.write(p1b, 'abcdef') + res = tee(p1a, p2b, 3, 0) + self.assertEqual(res, 3) + self.assertEqual(os.read(p2a, 3), 'abc') + self.assertEqual(os.read(p1a, 6), 'abcdef') + + @patch.object(tee, '_c_tee') + def test_fileno(self, mock_tee): + '''Test handling of file-descriptors''' + + with pipe() as (pa, pb): + tee(pa, pb, 3, 0) + self.assertEqual(mock_tee.call_args, ((pa, pb, 3, 0), {})) + + mock_tee.reset_mock() + + tee(os.fdopen(pa, 'r'), os.fdopen(pb, 'w'), 3, 0) + self.assertEqual(mock_tee.call_args, ((pa, pb, 3, 0), {})) + + @patch.object(tee, '_c_tee') + def test_flags_list(self, mock_tee): + '''Test handling of flag lists''' + + tee(1, 2, 3, [splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK]) + flags = splice.SPLICE_F_MOVE | splice.SPLICE_F_NONBLOCK + self.assertEqual(mock_tee.call_args, ((1, 2, 3, flags), {})) + + mock_tee.reset_mock() + + tee(1, 2, 3, []) + self.assertEqual(mock_tee.call_args, ((1, 2, 3, 0), {})) + + def test_errno(self): + '''Test handling of failures''' + + # Invoke EBADF by using a read-only FD as fd_out + with open('/dev/null', 'r') as fd: + err = errno.EBADF + msg = r'\[Errno %d\] tee: %s' % (err, os.strerror(err)) + + self.assertRaisesRegexp(IOError, msg, tee, fd, fd, 3, 0) + + self.assertEqual(ctypes.get_errno(), 0) + + @patch('swift.common.splice.tee._c_tee', None) + def test_unavailable(self): + '''Test exception when unavailable''' + + self.assertRaises(EnvironmentError, tee, 1, 2, 2, 0) diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 74f8d2cd4e..162ae5493a 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -41,6 +41,7 @@ from swift.obj import diskfile from swift.common import utils from swift.common.utils import hash_path, mkdirs, Timestamp from swift.common import ring +from swift.common.splice import splice from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ DiskFileError, ReplicationLockTimeout, PathNotDir, DiskFileCollision, \ @@ -954,8 +955,7 @@ class TestDiskFileManager(unittest.TestCase): def test_missing_splice_warning(self): logger = FakeLogger() - with mock.patch('swift.obj.diskfile.system_has_splice', - lambda: False): + with mock.patch('swift.common.splice.splice._c_splice', None): self.conf['splice'] = 'yes' mgr = diskfile.DiskFileManager(self.conf, logger) @@ -2242,7 +2242,7 @@ class TestDiskFile(unittest.TestCase): self.assertTrue(exp_name in set(dl)) def _system_can_zero_copy(self): - if not utils.system_has_splice(): + if not splice.available: return False try: diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 27bc9637ff..d0e9b583a3 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -47,6 +47,7 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory, public, replication from swift.common import constraints from swift.common.swob import Request, HeaderKeyDict +from swift.common.splice import splice from swift.common.storage_policy import POLICIES from swift.common.exceptions import DiskFileDeviceUnavailable @@ -4415,7 +4416,7 @@ class TestZeroCopy(unittest.TestCase): """Test the object server's zero-copy functionality""" def _system_can_zero_copy(self): - if not utils.system_has_splice(): + if not splice.available: return False try: