From 69b8165cd87693f077a2385d9b6d53a7f459b098 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 22 Apr 2020 12:33:10 -0700 Subject: [PATCH] obj: _finalize_durable may succeed even when data file is missing Ordinarily, an ENOENT in _finalize_durable should mean something's gone off the rails -- we expected to be able to mark data durable, but couldn't! If there are concurrent writers, though, it might actually be OK: Client A writes .data Client B writes .data Client B finalizes .data *and cleans up on-disk files* Client A tries to finalize but .data is gone Previously, the above would cause the object server to 500, and if enough of them did this, the client may see a 503. Now, call it good so clients get 201s. Change-Id: I4e322a7be23870a62aaa6acee8435598a056c544 Closes-Bug: #1719860 --- swift/obj/diskfile.py | 21 ++++++++- test/unit/obj/test_diskfile.py | 86 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 3c3c2bd5a6..68f23b3e45 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -3031,7 +3031,8 @@ class ECDiskFileReader(BaseDiskFileReader): class ECDiskFileWriter(BaseDiskFileWriter): - def _finalize_durable(self, data_file_path, durable_data_file_path): + def _finalize_durable(self, data_file_path, durable_data_file_path, + timestamp): exc = None new_data_file_path = new_durable_data_file_path = None if self.next_part_power: @@ -3055,6 +3056,21 @@ class ECDiskFileWriter(BaseDiskFileWriter): exc) except (OSError, IOError) as err: + if err.errno == errno.ENOENT: + files = os.listdir(self._datadir) + results = self.manager.get_ondisk_files( + files, self._datadir, + frag_index=self._diskfile._frag_index, + policy=self._diskfile.policy) + # We "succeeded" if another writer cleaned up our data + ts_info = results.get('ts_info') + durables = results.get('durable_frag_set', []) + if ts_info and ts_info['timestamp'] > timestamp: + return + elif any(frag_set['timestamp'] > timestamp + for frag_set in durables): + return + if err.errno not in (errno.ENOSPC, errno.EDQUOT): # re-raise to catch all handler raise @@ -3102,7 +3118,8 @@ class ECDiskFileWriter(BaseDiskFileWriter): self._datadir, self.manager.make_on_disk_filename( timestamp, '.data', self._diskfile._frag_index, durable=True)) tpool.execute( - self._finalize_durable, data_file_path, durable_data_file_path) + self._finalize_durable, data_file_path, durable_data_file_path, + timestamp) def put(self, metadata): """ diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 2adf631a7e..6d1a53b512 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -24,6 +24,7 @@ import mock import unittest import email import tempfile +import threading import uuid import xattr import re @@ -3705,6 +3706,91 @@ class DiskFileMixin(BaseDiskFileTestMixin): # can close again writer.close() + def test_disk_file_concurrent_writes(self): + def threadA(df, events, errors): + try: + ts = self.ts() + with df.create() as writer: + writer.write(b'dataA') + writer.put({ + 'X-Timestamp': ts.internal, + 'Content-Length': 5, + }) + events[0].set() + events[1].wait() + writer.commit(ts) + except Exception as e: + errors.append(e) + raise + + def threadB(df, events, errors): + try: + events[0].wait() + ts = self.ts() + with df.create() as writer: + writer.write(b'dataB') + writer.put({ + 'X-Timestamp': ts.internal, + 'Content-Length': 5, + }) + writer.commit(ts) + events[1].set() + except Exception as e: + errors.append(e) + raise + + df = self._simple_get_diskfile() + events = [threading.Event(), threading.Event()] + errors = [] + + threads = [threading.Thread(target=tgt, args=(df, events, errors)) + for tgt in (threadA, threadB)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertFalse(errors) + + with df.open(), open(df._data_file, 'rb') as fp: + self.assertEqual(b'dataB', fp.read()) + + def test_disk_file_concurrent_delete(self): + def threadA(df, events, errors): + try: + ts = self.ts() + with df.create() as writer: + writer.write(b'dataA') + writer.put({'X-Timestamp': ts.internal}) + events[0].set() + events[1].wait() + writer.commit(ts) + except Exception as e: + errors.append(e) + raise + + def threadB(df, events, errors): + try: + events[0].wait() + df.delete(self.ts()) + events[1].set() + except Exception as e: + errors.append(e) + raise + + df = self._simple_get_diskfile() + events = [threading.Event(), threading.Event()] + errors = [] + + threads = [threading.Thread(target=tgt, args=(df, events, errors)) + for tgt in (threadA, threadB)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertFalse(errors) + + self.assertRaises(DiskFileDeleted, df.open) + def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, csize=8, mark_deleted=False, prealloc=False, ts=None, mount_check=False, extra_metadata=None,