Merge "fsync() on directories"
This commit is contained in:
commit
09474f28b7
@ -338,12 +338,12 @@ class DatabaseBroker(object):
|
||||
self.db_type + 's',
|
||||
os.path.basename(self.db_dir))
|
||||
try:
|
||||
renamer(self.db_dir, quar_path)
|
||||
renamer(self.db_dir, quar_path, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
quar_path = "%s-%s" % (quar_path, uuid4().hex)
|
||||
renamer(self.db_dir, quar_path)
|
||||
renamer(self.db_dir, quar_path, fsync=False)
|
||||
detail = _('Quarantined %s to %s due to %s database') % \
|
||||
(self.db_dir, quar_path, exc_hint)
|
||||
self.logger.error(detail)
|
||||
|
@ -59,12 +59,12 @@ def quarantine_db(object_file, server_type):
|
||||
os.path.join(object_dir, '..', '..', '..', '..', 'quarantined',
|
||||
server_type + 's', os.path.basename(object_dir)))
|
||||
try:
|
||||
renamer(object_dir, quarantine_dir)
|
||||
renamer(object_dir, quarantine_dir, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
quarantine_dir = "%s-%s" % (quarantine_dir, uuid.uuid4().hex)
|
||||
renamer(object_dir, quarantine_dir)
|
||||
renamer(object_dir, quarantine_dir, fsync=False)
|
||||
|
||||
|
||||
def roundrobin_datadirs(datadirs):
|
||||
|
@ -645,6 +645,27 @@ def fdatasync(fd):
|
||||
fsync(fd)
|
||||
|
||||
|
||||
def fsync_dir(dirpath):
|
||||
"""
|
||||
Sync directory entries to disk.
|
||||
|
||||
:param dirpath: Path to the directory to be synced.
|
||||
"""
|
||||
dirfd = None
|
||||
try:
|
||||
dirfd = os.open(dirpath, os.O_DIRECTORY | os.O_RDONLY)
|
||||
fsync(dirfd)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOTDIR:
|
||||
# Raise error if someone calls fsync_dir on a non-directory
|
||||
raise
|
||||
logging.warn(_("Unable to perform fsync() on directory %s: %s"),
|
||||
dirpath, os.strerror(err.errno))
|
||||
finally:
|
||||
if dirfd:
|
||||
os.close(dirfd)
|
||||
|
||||
|
||||
def drop_buffer_cache(fd, offset, length):
|
||||
"""
|
||||
Drop 'buffer' cache for the given range of the given file.
|
||||
@ -856,20 +877,66 @@ def mkdirs(path):
|
||||
raise
|
||||
|
||||
|
||||
def renamer(old, new):
|
||||
def makedirs_count(path, count=0):
|
||||
"""
|
||||
Same as os.makedirs() except that this method returns the number of
|
||||
new directories that had to be created.
|
||||
|
||||
Also, this does not raise an error if target directory already exists.
|
||||
This behaviour is similar to Python 3.x's os.makedirs() called with
|
||||
exist_ok=True. Also similar to swift.common.utils.mkdirs()
|
||||
|
||||
https://hg.python.org/cpython/file/v3.4.2/Lib/os.py#l212
|
||||
"""
|
||||
head, tail = os.path.split(path)
|
||||
if not tail:
|
||||
head, tail = os.path.split(head)
|
||||
if head and tail and not os.path.exists(head):
|
||||
count = makedirs_count(head, count)
|
||||
if tail == os.path.curdir:
|
||||
return
|
||||
try:
|
||||
os.mkdir(path)
|
||||
except OSError as e:
|
||||
# EEXIST may also be raised if path exists as a file
|
||||
# Do not let that pass.
|
||||
if e.errno != errno.EEXIST or not os.path.isdir(path):
|
||||
raise
|
||||
else:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def renamer(old, new, fsync=True):
|
||||
"""
|
||||
Attempt to fix / hide race conditions like empty object directories
|
||||
being removed by backend processes during uploads, by retrying.
|
||||
|
||||
The containing directory of 'new' and of all newly created directories are
|
||||
fsync'd by default. This _will_ come at a performance penalty. In cases
|
||||
where these additional fsyncs are not necessary, it is expected that the
|
||||
caller of renamer() turn it off explicitly.
|
||||
|
||||
:param old: old path to be renamed
|
||||
:param new: new path to be renamed to
|
||||
:param fsync: fsync on containing directory of new and also all
|
||||
the newly created directories.
|
||||
"""
|
||||
dirpath = os.path.dirname(new)
|
||||
try:
|
||||
mkdirs(os.path.dirname(new))
|
||||
count = makedirs_count(dirpath)
|
||||
os.rename(old, new)
|
||||
except OSError:
|
||||
mkdirs(os.path.dirname(new))
|
||||
count = makedirs_count(dirpath)
|
||||
os.rename(old, new)
|
||||
if fsync:
|
||||
# If count=0, no new directories were created. But we still need to
|
||||
# fsync leaf dir after os.rename().
|
||||
# If count>0, starting from leaf dir, fsync parent dirs of all
|
||||
# directories created by makedirs_count()
|
||||
for i in range(0, count + 1):
|
||||
fsync_dir(dirpath)
|
||||
dirpath = os.path.dirname(dirpath)
|
||||
|
||||
|
||||
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
|
||||
@ -2490,7 +2557,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
|
||||
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
|
||||
delete=False) as tf:
|
||||
tf.write(json.dumps(cache_entry) + '\n')
|
||||
os.rename(tf.name, cache_file)
|
||||
renamer(tf.name, cache_file, fsync=False)
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tf.name)
|
||||
|
@ -203,12 +203,12 @@ def quarantine_renamer(device_path, corrupted_file_path):
|
||||
basename(from_dir))
|
||||
invalidate_hash(dirname(from_dir))
|
||||
try:
|
||||
renamer(from_dir, to_dir)
|
||||
renamer(from_dir, to_dir, fsync=False)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||
raise
|
||||
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
||||
renamer(from_dir, to_dir)
|
||||
renamer(from_dir, to_dir, fsync=False)
|
||||
return to_dir
|
||||
|
||||
|
||||
@ -345,6 +345,8 @@ def invalidate_hash(suffix_dir):
|
||||
suffix = basename(suffix_dir)
|
||||
partition_dir = dirname(suffix_dir)
|
||||
hashes_file = join(partition_dir, HASH_FILE)
|
||||
if not os.path.exists(hashes_file):
|
||||
return
|
||||
with lock_path(partition_dir):
|
||||
try:
|
||||
with open(hashes_file, 'rb') as fp:
|
||||
|
@ -216,9 +216,9 @@ class ObjectUpdater(Daemon):
|
||||
self.logger.exception(
|
||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||
self.logger.increment('quarantines')
|
||||
renamer(update_path, os.path.join(
|
||||
device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path)))
|
||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path))
|
||||
renamer(update_path, target_path, fsync=False)
|
||||
return
|
||||
successes = update.get('successes', [])
|
||||
part, nodes = self.get_container_ring().get_nodes(
|
||||
|
@ -738,7 +738,8 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
dbpath = os.path.join(self.testdir, 'dev', 'dbs', 'par', 'pre', 'db')
|
||||
mkdirs(dbpath)
|
||||
qpath = os.path.join(self.testdir, 'dev', 'quarantined', 'tests', 'db')
|
||||
with patch('swift.common.db.renamer', lambda a, b: b):
|
||||
with patch('swift.common.db.renamer', lambda a, b,
|
||||
fsync: b):
|
||||
# Test malformed database
|
||||
copy(os.path.join(os.path.dirname(__file__),
|
||||
'malformed_example.db'),
|
||||
|
@ -563,7 +563,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, replicator.brokerclass,
|
||||
'get_repl_missing_table', True)
|
||||
|
||||
def mock_renamer(was, new, cause_colision=False):
|
||||
def mock_renamer(was, new, fsync=False, cause_colision=False):
|
||||
if cause_colision and '-' not in new:
|
||||
raise OSError(errno.EEXIST, "File already exists")
|
||||
self.assertEquals('/a/b/c/d/e', was)
|
||||
@ -573,8 +573,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
else:
|
||||
self.assertEquals('/a/quarantined/containers/e', new)
|
||||
|
||||
def mock_renamer_error(was, new):
|
||||
return mock_renamer(was, new, cause_colision=True)
|
||||
def mock_renamer_error(was, new, fsync):
|
||||
return mock_renamer(was, new, fsync, cause_colision=True)
|
||||
with patch.object(db_replicator, 'renamer', mock_renamer):
|
||||
replicator._replicate_object('0', 'file', 'node_id')
|
||||
# try the double quarantine
|
||||
|
@ -2805,6 +2805,113 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertEqual(None, utils.cache_from_env(env, True))
|
||||
self.assertEqual(0, len(logger.get_lines_for_level('error')))
|
||||
|
||||
def test_fsync_dir(self):
|
||||
|
||||
tempdir = None
|
||||
fd = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
fd, temppath = tempfile.mkstemp(dir=tempdir)
|
||||
|
||||
_mock_fsync = mock.Mock()
|
||||
_mock_close = mock.Mock()
|
||||
|
||||
with patch('swift.common.utils.fsync', _mock_fsync):
|
||||
with patch('os.close', _mock_close):
|
||||
utils.fsync_dir(tempdir)
|
||||
self.assertTrue(_mock_fsync.called)
|
||||
self.assertTrue(_mock_close.called)
|
||||
self.assertTrue(isinstance(_mock_fsync.call_args[0][0], int))
|
||||
self.assertEqual(_mock_fsync.call_args[0][0],
|
||||
_mock_close.call_args[0][0])
|
||||
|
||||
# Not a directory - arg is file path
|
||||
self.assertRaises(OSError, utils.fsync_dir, temppath)
|
||||
|
||||
logger = FakeLogger()
|
||||
|
||||
def _mock_fsync(fd):
|
||||
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
|
||||
|
||||
with patch('swift.common.utils.fsync', _mock_fsync):
|
||||
with mock.patch('swift.common.utils.logging', logger):
|
||||
utils.fsync_dir(tempdir)
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
|
||||
finally:
|
||||
if fd is not None:
|
||||
os.close(fd)
|
||||
os.unlink(temppath)
|
||||
if tempdir:
|
||||
os.rmdir(tempdir)
|
||||
|
||||
def test_renamer_with_fsync_dir(self):
|
||||
tempdir = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
# Simulate part of object path already existing
|
||||
part_dir = os.path.join(tempdir, 'objects/1234/')
|
||||
os.makedirs(part_dir)
|
||||
obj_dir = os.path.join(part_dir, 'aaa', 'a' * 32)
|
||||
obj_path = os.path.join(obj_dir, '1425276031.12345.data')
|
||||
|
||||
# Object dir had to be created
|
||||
_m_os_rename = mock.Mock()
|
||||
_m_fsync_dir = mock.Mock()
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
utils.renamer("fake_path", obj_path)
|
||||
_m_os_rename.assert_called_once_with('fake_path', obj_path)
|
||||
# fsync_dir on parents of all newly create dirs
|
||||
self.assertEqual(_m_fsync_dir.call_count, 3)
|
||||
|
||||
# Object dir existed
|
||||
_m_os_rename.reset_mock()
|
||||
_m_fsync_dir.reset_mock()
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
utils.renamer("fake_path", obj_path)
|
||||
_m_os_rename.assert_called_once_with('fake_path', obj_path)
|
||||
# fsync_dir only on the leaf dir
|
||||
self.assertEqual(_m_fsync_dir.call_count, 1)
|
||||
finally:
|
||||
if tempdir:
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_renamer_when_fsync_is_false(self):
|
||||
_m_os_rename = mock.Mock()
|
||||
_m_fsync_dir = mock.Mock()
|
||||
_m_makedirs_count = mock.Mock(return_value=2)
|
||||
with patch('os.rename', _m_os_rename):
|
||||
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
|
||||
with patch('swift.common.utils.makedirs_count',
|
||||
_m_makedirs_count):
|
||||
utils.renamer("fake_path", "/a/b/c.data", fsync=False)
|
||||
_m_makedirs_count.assert_called_once_with("/a/b")
|
||||
_m_os_rename.assert_called_once_with('fake_path', "/a/b/c.data")
|
||||
self.assertFalse(_m_fsync_dir.called)
|
||||
|
||||
def test_makedirs_count(self):
|
||||
tempdir = None
|
||||
fd = None
|
||||
try:
|
||||
tempdir = mkdtemp(dir='/tmp')
|
||||
os.makedirs(os.path.join(tempdir, 'a/b'))
|
||||
# 4 new dirs created
|
||||
dirpath = os.path.join(tempdir, 'a/b/1/2/3/4')
|
||||
ret = utils.makedirs_count(dirpath)
|
||||
self.assertEqual(ret, 4)
|
||||
# no new dirs created - dir already exists
|
||||
ret = utils.makedirs_count(dirpath)
|
||||
self.assertEqual(ret, 0)
|
||||
# path exists and is a file
|
||||
fd, temppath = tempfile.mkstemp(dir=dirpath)
|
||||
os.close(fd)
|
||||
self.assertRaises(OSError, utils.makedirs_count, temppath)
|
||||
finally:
|
||||
if tempdir:
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
|
||||
class ResellerConfReader(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user