From 014d46f9a763278668903ea65093affa5bc74a78 Mon Sep 17 00:00:00 2001 From: Romain LE DISEZ Date: Thu, 18 Oct 2018 14:46:38 +0200 Subject: [PATCH] Fix SSYNC concurrency on partition Commit e199192caefef068b5bf57da8b878e0bc82e3453 introduced the ability to have multiple SSYNC running on a single device. It misses a security to ensure that only one SSYNC request can be running on a partition. This commit update replication_lock to lock N times the device, then lock once the partition related to a SSYNC request. Change-Id: Id053ed7dd355d414d7920dda79a968a1c6677c14 --- swift/common/utils.py | 6 ++- swift/obj/diskfile.py | 17 ++++++++- swift/obj/ssync_receiver.py | 4 +- test/unit/common/test_utils.py | 22 +++++++++++ test/unit/obj/test_diskfile.py | 55 +++++++++++++++++++++------- test/unit/obj/test_ssync_receiver.py | 55 +++++++++++++++++++++++++++- 6 files changed, 139 insertions(+), 20 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 57b219c36a..32ea6bd66f 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2576,7 +2576,7 @@ def _get_any_lock(fds): @contextmanager -def lock_path(directory, timeout=10, timeout_class=None, limit=1): +def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None): """ Context manager that acquires a lock on a directory. This will block until the lock can be acquired, or the timeout time has expired (whichever occurs @@ -2596,6 +2596,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1): the same directory at the time this method is called. Note that this limit is only applied during the current call to this method and does not prevent subsequent calls giving a larger limit. Defaults to 1. + :param name: A string to distinguishes different type of locks in a + directory :raises TypeError: if limit is not an int. :raises ValueError: if limit is less than 1. """ @@ -2605,6 +2607,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1): timeout_class = swift.common.exceptions.LockTimeout mkdirs(directory) lockpath = '%s/.lock' % directory + if name: + lockpath += '-%s' % str(name) fds = [os.open(get_zero_indexed_base_string(lockpath, i), os.O_WRONLY | os.O_CREAT) for i in range(limit)] diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 987ec1d8d6..f260692950 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1267,23 +1267,34 @@ class BaseDiskFileManager(object): return None @contextmanager - def replication_lock(self, device): + def replication_lock(self, device, policy, partition): """ A context manager that will lock on the device given, if configured to do so. :param device: name of target device + :param policy: policy targeted by the replication request + :param partition: partition targeted by the replication request :raises ReplicationLockTimeout: If the lock on the device cannot be granted within the configured timeout. """ if self.replication_concurrency_per_device: dev_path = self.get_dev_path(device) + part_path = os.path.join(dev_path, get_data_dir(policy), + str(partition)) + limit_time = time.time() + self.replication_lock_timeout with lock_path( dev_path, timeout=self.replication_lock_timeout, timeout_class=ReplicationLockTimeout, limit=self.replication_concurrency_per_device): - yield True + with lock_path( + part_path, + timeout=limit_time - time.time(), + timeout_class=ReplicationLockTimeout, + limit=1, + name='replication'): + yield True else: yield True @@ -1574,6 +1585,8 @@ class BaseDiskFileManager(object): # This lock is only held by people dealing with the hashes # or the hash invalidations, and we've just removed those. _unlink_if_present(os.path.join(partition_path, ".lock")) + _unlink_if_present(os.path.join(partition_path, + ".lock-replication")) os.rmdir(partition_path) except OSError as err: self.logger.debug("Error cleaning up empty partition: %s", err) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 99a356ea27..68844dd01e 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -158,7 +158,9 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - with self.diskfile_mgr.replication_lock(self.device): + with self.diskfile_mgr.replication_lock(self.device, + self.policy, + self.partition): for data in self.missing_check(): yield data for data in self.updates(): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 6e94b20878..154c53e1f2 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1072,6 +1072,28 @@ class TestUtils(unittest.TestCase): self.assertTrue(exc2 is not None) self.assertTrue(not success) + @with_tempdir + def test_lock_path_name(self, tmpdir): + # With default limit (1), can't take the same named lock twice + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with self.assertRaises(LockTimeout): + with utils.lock_path(tmpdir, 0.1, name='foo'): + success = True + self.assertFalse(success) + # With default limit (1), can take two differently named locks + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with utils.lock_path(tmpdir, 0.1, name='bar'): + success = True + self.assertTrue(success) + # With default limit (1), can take a named lock and the default lock + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with utils.lock_path(tmpdir, 0.1): + success = True + self.assertTrue(success) + def test_normalize_timestamp(self): # Test swift.common.utils.normalize_timestamp self.assertEqual(utils.normalize_timestamp('1253327593.48174'), diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 12840bce04..63c37f6644 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -1080,9 +1080,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.df_mgr.replication_concurrency_per_device = 1 self.df_mgr.replication_lock_timeout = 0.1 success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): with self.assertRaises(ReplicationLockTimeout): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True self.assertFalse(success) @@ -1093,9 +1095,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 2 locks must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1103,10 +1107,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 3 locks must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '3'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1119,9 +1126,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 2 locks with replication_concurrency_per_device=2 must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1129,10 +1138,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 3 locks with replication_concurrency_per_device=2 must fail success = False - with self.df_mgr.replication_lock(self.existing_device): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): with self.assertRaises(ReplicationLockTimeout): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '3'): success = True self.assertFalse(success) @@ -1141,14 +1153,29 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.df_mgr.replication_concurrency_per_device = 1 self.df_mgr.replication_lock_timeout = 0.1 success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device2): + with self.df_mgr.replication_lock(self.existing_device2, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) self.assertTrue(success) + def test_replication_lock_same_partition(self): + # Double check settings + self.df_mgr.replication_concurrency_per_device = 2 + self.df_mgr.replication_lock_timeout = 0.1 + success = False + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.assertRaises(ReplicationLockTimeout): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + success = True + self.assertFalse(success) + def test_missing_splice_warning(self): with mock.patch('swift.common.splice.splice._c_splice', None): self.conf['splice'] = 'yes' diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 9ad5b619dd..461612c96b 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools import os import shutil import tempfile @@ -131,7 +132,9 @@ class TestReceiver(unittest.TestCase): [':MISSING_CHECK: START', ':MISSING_CHECK: END', ':UPDATES: START', ':UPDATES: END']) self.assertEqual(resp.status_int, 200) - mocked_replication_lock.assert_called_once_with('sda1') + mocked_replication_lock.assert_called_once_with('sda1', + POLICIES.legacy, + '1') def test_Receiver_with_default_storage_policy(self): req = swob.Request.blank( @@ -290,7 +293,7 @@ class TestReceiver(unittest.TestCase): self.controller, req) def test_SSYNC_replication_lock_fail(self): - def _mock(path): + def _mock(path, policy, partition): with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): eventlet.sleep(0.05) with mock.patch.object( @@ -311,6 +314,54 @@ class TestReceiver(unittest.TestCase): 'None/sda1/1 SSYNC LOCK TIMEOUT: 0.01 seconds: ' '/somewhere/sda1') + def test_SSYNC_replication_lock_per_partition(self): + def _concurrent_ssync(path1, path2): + env = {'REQUEST_METHOD': 'SSYNC'} + body = ':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' \ + ':UPDATES: START\r\n:UPDATES: END\r\n' + req1 = swob.Request.blank(path1, environ=env, body=body) + req2 = swob.Request.blank(path2, environ=env, body=body) + + rcvr1 = ssync_receiver.Receiver(self.controller, req1) + rcvr2 = ssync_receiver.Receiver(self.controller, req2) + + body_lines1 = [] + body_lines2 = [] + + for chunk1, chunk2 in itertools.izip_longest(rcvr1(), rcvr2()): + if chunk1 and chunk1.strip(): + body_lines1.append(chunk1.strip()) + if chunk2 and chunk2.strip(): + body_lines2.append(chunk2.strip()) + + return body_lines1, body_lines2 + + self.controller._diskfile_router[POLICIES[0]]\ + .replication_lock_timeout = 0.01 + self.controller._diskfile_router[POLICIES[0]]\ + .replication_concurrency_per_device = 2 + # It should be possible to lock two different partitions + body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/2') + self.assertEqual( + body_lines1, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual( + body_lines2, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + + # It should not be possible to lock the same partition twice + body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/1') + self.assertEqual( + body_lines1, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertRegexpMatches( + ''.join(body_lines2), + "^:ERROR: 0 '0\.0[0-9]+ seconds: " + "/.+/sda1/objects/1/.lock-replication'$") + def test_SSYNC_initial_path(self): with mock.patch.object( self.controller, 'replication_semaphore') as \