Merge "obj: _finalize_durable may succeed even when data file is missing"
This commit is contained in:
commit
c83d42f790
@ -3031,7 +3031,8 @@ class ECDiskFileReader(BaseDiskFileReader):
|
||||
|
||||
class ECDiskFileWriter(BaseDiskFileWriter):
|
||||
|
||||
def _finalize_durable(self, data_file_path, durable_data_file_path):
|
||||
def _finalize_durable(self, data_file_path, durable_data_file_path,
|
||||
timestamp):
|
||||
exc = None
|
||||
new_data_file_path = new_durable_data_file_path = None
|
||||
if self.next_part_power:
|
||||
@ -3055,6 +3056,21 @@ class ECDiskFileWriter(BaseDiskFileWriter):
|
||||
exc)
|
||||
|
||||
except (OSError, IOError) as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
files = os.listdir(self._datadir)
|
||||
results = self.manager.get_ondisk_files(
|
||||
files, self._datadir,
|
||||
frag_index=self._diskfile._frag_index,
|
||||
policy=self._diskfile.policy)
|
||||
# We "succeeded" if another writer cleaned up our data
|
||||
ts_info = results.get('ts_info')
|
||||
durables = results.get('durable_frag_set', [])
|
||||
if ts_info and ts_info['timestamp'] > timestamp:
|
||||
return
|
||||
elif any(frag_set['timestamp'] > timestamp
|
||||
for frag_set in durables):
|
||||
return
|
||||
|
||||
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
|
||||
# re-raise to catch all handler
|
||||
raise
|
||||
@ -3102,7 +3118,8 @@ class ECDiskFileWriter(BaseDiskFileWriter):
|
||||
self._datadir, self.manager.make_on_disk_filename(
|
||||
timestamp, '.data', self._diskfile._frag_index, durable=True))
|
||||
tpool.execute(
|
||||
self._finalize_durable, data_file_path, durable_data_file_path)
|
||||
self._finalize_durable, data_file_path, durable_data_file_path,
|
||||
timestamp)
|
||||
|
||||
def put(self, metadata):
|
||||
"""
|
||||
|
@ -24,6 +24,7 @@ import mock
|
||||
import unittest
|
||||
import email
|
||||
import tempfile
|
||||
import threading
|
||||
import uuid
|
||||
import xattr
|
||||
import re
|
||||
@ -3705,6 +3706,91 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
# can close again
|
||||
writer.close()
|
||||
|
||||
def test_disk_file_concurrent_writes(self):
|
||||
def threadA(df, events, errors):
|
||||
try:
|
||||
ts = self.ts()
|
||||
with df.create() as writer:
|
||||
writer.write(b'dataA')
|
||||
writer.put({
|
||||
'X-Timestamp': ts.internal,
|
||||
'Content-Length': 5,
|
||||
})
|
||||
events[0].set()
|
||||
events[1].wait()
|
||||
writer.commit(ts)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
|
||||
def threadB(df, events, errors):
|
||||
try:
|
||||
events[0].wait()
|
||||
ts = self.ts()
|
||||
with df.create() as writer:
|
||||
writer.write(b'dataB')
|
||||
writer.put({
|
||||
'X-Timestamp': ts.internal,
|
||||
'Content-Length': 5,
|
||||
})
|
||||
writer.commit(ts)
|
||||
events[1].set()
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
|
||||
df = self._simple_get_diskfile()
|
||||
events = [threading.Event(), threading.Event()]
|
||||
errors = []
|
||||
|
||||
threads = [threading.Thread(target=tgt, args=(df, events, errors))
|
||||
for tgt in (threadA, threadB)]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
self.assertFalse(errors)
|
||||
|
||||
with df.open(), open(df._data_file, 'rb') as fp:
|
||||
self.assertEqual(b'dataB', fp.read())
|
||||
|
||||
def test_disk_file_concurrent_delete(self):
|
||||
def threadA(df, events, errors):
|
||||
try:
|
||||
ts = self.ts()
|
||||
with df.create() as writer:
|
||||
writer.write(b'dataA')
|
||||
writer.put({'X-Timestamp': ts.internal})
|
||||
events[0].set()
|
||||
events[1].wait()
|
||||
writer.commit(ts)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
|
||||
def threadB(df, events, errors):
|
||||
try:
|
||||
events[0].wait()
|
||||
df.delete(self.ts())
|
||||
events[1].set()
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
|
||||
df = self._simple_get_diskfile()
|
||||
events = [threading.Event(), threading.Event()]
|
||||
errors = []
|
||||
|
||||
threads = [threading.Thread(target=tgt, args=(df, events, errors))
|
||||
for tgt in (threadA, threadB)]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
self.assertFalse(errors)
|
||||
|
||||
self.assertRaises(DiskFileDeleted, df.open)
|
||||
|
||||
def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024,
|
||||
csize=8, mark_deleted=False, prealloc=False,
|
||||
ts=None, mount_check=False, extra_metadata=None,
|
||||
|
Loading…
Reference in New Issue
Block a user