fsync() on directories
renamer() method now does a fsync on containing directory of target path and also on parent dirs of newly created directories, by default. This can be explicitly turned off in cases where it is not necessary (For example- quarantines). The following article explains why this is necessary: http://lwn.net/Articles/457667/ Although, it may seem like the right thing to do, this change does come at a performance penalty. However, no configurable option is provided to turn it off. Also, lock_path() inside invalidate_hash() was always creating part of object path in filesystem. Those are never fsync'd. This has been fixed. Change-Id: Id8e02f84f48370edda7fb0c46e030db3b53a71e3 Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
parent
23f55b2ebc
commit
a6f630f27c
@ -338,12 +338,12 @@ class DatabaseBroker(object):
|
||||
self.db_type + 's',
|
||||
os.path.basename(self.db_dir))
|
||||
try:
|
||||
renamer(self.db_dir, quar_path)
|
||||
renamer(self.db_dir, quar_path, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
quar_path = "%s-%s" % (quar_path, uuid4().hex)
|
||||
renamer(self.db_dir, quar_path)
|
||||
renamer(self.db_dir, quar_path, fsync=False)
|
||||
detail = _('Quarantined %s to %s due to %s database') % \
|
||||
(self.db_dir, quar_path, exc_hint)
|
||||
self.logger.error(detail)
|
||||
|
@ -59,12 +59,12 @@ def quarantine_db(object_file, server_type):
|
||||
os.path.join(object_dir, '..', '..', '..', '..', 'quarantined',
|
||||
server_type + 's', os.path.basename(object_dir)))
|
||||
try:
|
||||
renamer(object_dir, quarantine_dir)
|
||||
renamer(object_dir, quarantine_dir, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
quarantine_dir = "%s-%s" % (quarantine_dir, uuid.uuid4().hex)
|
||||
renamer(object_dir, quarantine_dir)
|
||||
renamer(object_dir, quarantine_dir, fsync=False)
|
||||
|
||||
|
||||
def roundrobin_datadirs(datadirs):
|
||||
|
@ -645,6 +645,27 @@ def fdatasync(fd):
|
||||
fsync(fd)
|
||||
|
||||
|
||||
def fsync_dir(dirpath):
|
||||
"""
|
||||
Sync directory entries to disk.
|
||||
|
||||
:param dirpath: Path to the directory to be synced.
|
||||
"""
|
||||
dirfd = None
|
||||
try:
|
||||
dirfd = os.open(dirpath, os.O_DIRECTORY | os.O_RDONLY)
|
||||
fsync(dirfd)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOTDIR:
|
||||
# Raise error if someone calls fsync_dir on a non-directory
|
||||
raise
|
||||
logging.warn(_("Unable to perform fsync() on directory %s: %s"),
|
||||
dirpath, os.strerror(err.errno))
|
||||
finally:
|
||||
if dirfd:
|
||||
os.close(dirfd)
|
||||
|
||||
|
||||
def drop_buffer_cache(fd, offset, length):
|
||||
"""
|
||||
Drop 'buffer' cache for the given range of the given file.
|
||||
@ -856,20 +877,66 @@ def mkdirs(path):
|
||||
raise
|
||||
|
||||
|
||||
def renamer(old, new):
|
||||
def makedirs_count(path, count=0):
|
||||
"""
|
||||
Same as os.makedirs() except that this method returns the number of
|
||||
new directories that had to be created.
|
||||
|
||||
Also, this does not raise an error if target directory already exists.
|
||||
This behaviour is similar to Python 3.x's os.makedirs() called with
|
||||
exist_ok=True. Also similar to swift.common.utils.mkdirs()
|
||||
|
||||
https://hg.python.org/cpython/file/v3.4.2/Lib/os.py#l212
|
||||
"""
|
||||
head, tail = os.path.split(path)
|
||||
if not tail:
|
||||
head, tail = os.path.split(head)
|
||||
if head and tail and not os.path.exists(head):
|
||||
count = makedirs_count(head, count)
|
||||
if tail == os.path.curdir:
|
||||
return
|
||||
try:
|
||||
os.mkdir(path)
|
||||
except OSError as e:
|
||||
# EEXIST may also be raised if path exists as a file
|
||||
# Do not let that pass.
|
||||
if e.errno != errno.EEXIST or not os.path.isdir(path):
|
||||
raise
|
||||
else:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def renamer(old, new, fsync=True):
|
||||
"""
|
||||
Attempt to fix / hide race conditions like empty object directories
|
||||
being removed by backend processes during uploads, by retrying.
|
||||
|
||||
The containing directory of 'new' and of all newly created directories are
|
||||
fsync'd by default. This _will_ come at a performance penalty. In cases
|
||||
where these additional fsyncs are not necessary, it is expected that the
|
||||
caller of renamer() turn it off explicitly.
|
||||
|
||||
:param old: old path to be renamed
|
||||
:param new: new path to be renamed to
|
||||
:param fsync: fsync on containing directory of new and also all
|
||||
the newly created directories.
|
||||
"""
|
||||
dirpath = os.path.dirname(new)
|
||||
try:
|
||||
mkdirs(os.path.dirname(new))
|
||||
count = makedirs_count(dirpath)
|
||||
os.rename(old, new)
|
||||
except OSError:
|
||||
mkdirs(os.path.dirname(new))
|
||||
count = makedirs_count(dirpath)
|
||||
os.rename(old, new)
|
||||
if fsync:
|
||||
# If count=0, no new directories were created. But we still need to
|
||||
# fsync leaf dir after os.rename().
|
||||
# If count>0, starting from leaf dir, fsync parent dirs of all
|
||||
# directories created by makedirs_count()
|
||||
for i in range(0, count + 1):
|
||||
fsync_dir(dirpath)
|
||||
dirpath = os.path.dirname(dirpath)
|
||||
|
||||
|
||||
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
|
||||
@ -2490,7 +2557,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
|
||||
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
|
||||
delete=False) as tf:
|
||||
tf.write(json.dumps(cache_entry) + '\n')
|
||||
os.rename(tf.name, cache_file)
|
||||
renamer(tf.name, cache_file, fsync=False)
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tf.name)
|
||||
|
@ -203,12 +203,12 @@ def quarantine_renamer(device_path, corrupted_file_path):
|
||||
basename(from_dir))
|
||||
invalidate_hash(dirname(from_dir))
|
||||
try:
|
||||
renamer(from_dir, to_dir)
|
||||
renamer(from_dir, to_dir, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
||||
renamer(from_dir, to_dir)
|
||||
renamer(from_dir, to_dir, fsync=False)
|
||||
return to_dir
|
||||
|
||||
|
||||
@ -345,6 +345,8 @@ def invalidate_hash(suffix_dir):
|
||||
suffix = basename(suffix_dir)
|
||||
partition_dir = dirname(suffix_dir)
|
||||
hashes_file = join(partition_dir, HASH_FILE)
|
||||
if not os.path.exists(hashes_file):
|
||||
return
|
||||
with lock_path(partition_dir):
|
||||
try:
|
||||
with open(hashes_file, 'rb') as fp:
|
||||
|
@ -216,9 +216,9 @@ class ObjectUpdater(Daemon):
|
||||
self.logger.exception(
|
||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||
self.logger.increment('quarantines')
|
||||
renamer(update_path, os.path.join(
|
||||
device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path)))
|
||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path))
|
||||
renamer(update_path, target_path, fsync=False)
|
||||
return
|
||||
successes = update.get('successes', [])
|
||||
part, nodes = self.get_container_ring().get_nodes(
|
||||
|
@ -738,7 +738,8 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
dbpath = os.path.join(self.testdir, 'dev', 'dbs', 'par', 'pre', 'db')
|
||||
mkdirs(dbpath)
|
||||
qpath = os.path.join(self.testdir, 'dev', 'quarantined', 'tests', 'db')
|
||||
with patch('swift.common.db.renamer', lambda a, b: b):
|
||||
with patch('swift.common.db.renamer', lambda a, b,
|
||||
fsync: b):
|
||||
# Test malformed database
|
||||
copy(os.path.join(os.path.dirname(__file__),
|
||||
'malformed_example.db'),
|
||||
|
@ -563,7 +563,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, replicator.brokerclass,
|
||||
'get_repl_missing_table', True)
|
||||
|
||||
def mock_renamer(was, new, cause_colision=False):
|
||||
def mock_renamer(was, new, fsync=False, cause_colision=False):
|
||||
if cause_colision and '-' not in new:
|
||||
raise OSError(errno.EEXIST, "File already exists")
|
||||
self.assertEquals('/a/b/c/d/e', was)
|
||||
@ -573,8 +573,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
else:
|
||||
self.assertEquals('/a/quarantined/containers/e', new)
|
||||
|
||||
def mock_renamer_error(was, new):
|
||||
return mock_renamer(was, new, cause_colision=True)
|
||||
def mock_renamer_error(was, new, fsync):
|
||||
return mock_renamer(was, new, fsync, cause_colision=True)
|
||||
with patch.object(db_replicator, 'renamer', mock_renamer):
|
||||
replicator._replicate_object('0', 'file', 'node_id')
|
||||
# try the double quarantine
|
||||
|
@ -2805,6 +2805,113 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertEqual(None, utils.cache_from_env(env, True))
|
||||
self.assertEqual(0, len(logger.get_lines_for_level('error')))
|
||||
|
||||
def test_fsync_dir(self):
|
||||
|
||||
tempdir = None
|
||||
fd = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
fd, temppath = tempfile.mkstemp(dir=tempdir)
|
||||
|
||||
_mock_fsync = mock.Mock()
|
||||
_mock_close = mock.Mock()
|
||||
|
||||
with patch('swift.common.utils.fsync', _mock_fsync):
|
||||
with patch('os.close', _mock_close):
|
||||
utils.fsync_dir(tempdir)
|
||||
self.assertTrue(_mock_fsync.called)
|
||||
self.assertTrue(_mock_close.called)
|
||||
self.assertTrue(isinstance(_mock_fsync.call_args[0][0], int))
|
||||
self.assertEqual(_mock_fsync.call_args[0][0],
|
||||
_mock_close.call_args[0][0])
|
||||
|
||||
# Not a directory - arg is file path
|
||||
self.assertRaises(OSError, utils.fsync_dir, temppath)
|
||||
|
||||
logger = FakeLogger()
|
||||
|
||||
def _mock_fsync(fd):
|
||||
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
|
||||
|
||||
with patch('swift.common.utils.fsync', _mock_fsync):
|
||||
with mock.patch('swift.common.utils.logging', logger):
|
||||
utils.fsync_dir(tempdir)
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
|
||||
finally:
|
||||
if fd is not None:
|
||||
os.close(fd)
|
||||
os.unlink(temppath)
|
||||
if tempdir:
|
||||
os.rmdir(tempdir)
|
||||
|
||||
def test_renamer_with_fsync_dir(self):
|
||||
tempdir = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
# Simulate part of object path already existing
|
||||
part_dir = os.path.join(tempdir, 'objects/1234/')
|
||||
os.makedirs(part_dir)
|
||||
obj_dir = os.path.join(part_dir, 'aaa', 'a' * 32)
|
||||
obj_path = os.path.join(obj_dir, '1425276031.12345.data')
|
||||
|
||||
# Object dir had to be created
|
||||
_m_os_rename = mock.Mock()
|
||||
_m_fsync_dir = mock.Mock()
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
utils.renamer("fake_path", obj_path)
|
||||
_m_os_rename.assert_called_once_with('fake_path', obj_path)
|
||||
# fsync_dir on parents of all newly create dirs
|
||||
self.assertEqual(_m_fsync_dir.call_count, 3)
|
||||
|
||||
# Object dir existed
|
||||
_m_os_rename.reset_mock()
|
||||
_m_fsync_dir.reset_mock()
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
utils.renamer("fake_path", obj_path)
|
||||
_m_os_rename.assert_called_once_with('fake_path', obj_path)
|
||||
# fsync_dir only on the leaf dir
|
||||
self.assertEqual(_m_fsync_dir.call_count, 1)
|
||||
finally:
|
||||
if tempdir:
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_renamer_when_fsync_is_false(self):
|
||||
_m_os_rename = mock.Mock()
|
||||
_m_fsync_dir = mock.Mock()
|
||||
_m_makedirs_count = mock.Mock(return_value=2)
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
with patch('swift.common.utils.makedirs_count',
|
||||
_m_makedirs_count):
|
||||
utils.renamer("fake_path", "/a/b/c.data", fsync=False)
|
||||
_m_makedirs_count.assert_called_once_with("/a/b")
|
||||
_m_os_rename.assert_called_once_with('fake_path', "/a/b/c.data")
|
||||
self.assertFalse(_m_fsync_dir.called)
|
||||
|
||||
def test_makedirs_count(self):
|
||||
tempdir = None
|
||||
fd = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
os.makedirs(os.path.join(tempdir, 'a/b'))
|
||||
# 4 new dirs created
|
||||
dirpath = os.path.join(tempdir, 'a/b/1/2/3/4')
|
||||
ret = utils.makedirs_count(dirpath)
|
||||
self.assertEqual(ret, 4)
|
||||
# no new dirs created - dir already exists
|
||||
ret = utils.makedirs_count(dirpath)
|
||||
self.assertEqual(ret, 0)
|
||||
# path exists and is a file
|
||||
fd, temppath = tempfile.mkstemp(dir=dirpath)
|
||||
os.close(fd)
|
||||
self.assertRaises(OSError, utils.makedirs_count, temppath)
|
||||
finally:
|
||||
if tempdir:
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
|
||||
class ResellerConfReader(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user