diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 3c3c2bd5a6..68f23b3e45 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -3031,7 +3031,8 @@ class ECDiskFileReader(BaseDiskFileReader): class ECDiskFileWriter(BaseDiskFileWriter): - def _finalize_durable(self, data_file_path, durable_data_file_path): + def _finalize_durable(self, data_file_path, durable_data_file_path, + timestamp): exc = None new_data_file_path = new_durable_data_file_path = None if self.next_part_power: @@ -3055,6 +3056,21 @@ class ECDiskFileWriter(BaseDiskFileWriter): exc) except (OSError, IOError) as err: + if err.errno == errno.ENOENT: + files = os.listdir(self._datadir) + results = self.manager.get_ondisk_files( + files, self._datadir, + frag_index=self._diskfile._frag_index, + policy=self._diskfile.policy) + # We "succeeded" if another writer cleaned up our data + ts_info = results.get('ts_info') + durables = results.get('durable_frag_set', []) + if ts_info and ts_info['timestamp'] > timestamp: + return + elif any(frag_set['timestamp'] > timestamp + for frag_set in durables): + return + if err.errno not in (errno.ENOSPC, errno.EDQUOT): # re-raise to catch all handler raise @@ -3102,7 +3118,8 @@ class ECDiskFileWriter(BaseDiskFileWriter): self._datadir, self.manager.make_on_disk_filename( timestamp, '.data', self._diskfile._frag_index, durable=True)) tpool.execute( - self._finalize_durable, data_file_path, durable_data_file_path) + self._finalize_durable, data_file_path, durable_data_file_path, + timestamp) def put(self, metadata): """ diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 2adf631a7e..6d1a53b512 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -24,6 +24,7 @@ import mock import unittest import email import tempfile +import threading import uuid import xattr import re @@ -3705,6 +3706,91 @@ class DiskFileMixin(BaseDiskFileTestMixin): # can close again writer.close() + def test_disk_file_concurrent_writes(self): + def threadA(df, events, errors): + try: + ts = self.ts() + with df.create() as writer: + writer.write(b'dataA') + writer.put({ + 'X-Timestamp': ts.internal, + 'Content-Length': 5, + }) + events[0].set() + events[1].wait() + writer.commit(ts) + except Exception as e: + errors.append(e) + raise + + def threadB(df, events, errors): + try: + events[0].wait() + ts = self.ts() + with df.create() as writer: + writer.write(b'dataB') + writer.put({ + 'X-Timestamp': ts.internal, + 'Content-Length': 5, + }) + writer.commit(ts) + events[1].set() + except Exception as e: + errors.append(e) + raise + + df = self._simple_get_diskfile() + events = [threading.Event(), threading.Event()] + errors = [] + + threads = [threading.Thread(target=tgt, args=(df, events, errors)) + for tgt in (threadA, threadB)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertFalse(errors) + + with df.open(), open(df._data_file, 'rb') as fp: + self.assertEqual(b'dataB', fp.read()) + + def test_disk_file_concurrent_delete(self): + def threadA(df, events, errors): + try: + ts = self.ts() + with df.create() as writer: + writer.write(b'dataA') + writer.put({'X-Timestamp': ts.internal}) + events[0].set() + events[1].wait() + writer.commit(ts) + except Exception as e: + errors.append(e) + raise + + def threadB(df, events, errors): + try: + events[0].wait() + df.delete(self.ts()) + events[1].set() + except Exception as e: + errors.append(e) + raise + + df = self._simple_get_diskfile() + events = [threading.Event(), threading.Event()] + errors = [] + + threads = [threading.Thread(target=tgt, args=(df, events, errors)) + for tgt in (threadA, threadB)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertFalse(errors) + + self.assertRaises(DiskFileDeleted, df.open) + def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, csize=8, mark_deleted=False, prealloc=False, ts=None, mount_check=False, extra_metadata=None,