diff --git a/swift/common/utils.py b/swift/common/utils.py index f89b187eec..d765d122f7 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2585,7 +2585,7 @@ def _get_any_lock(fds): @contextmanager -def lock_path(directory, timeout=10, timeout_class=None, limit=1): +def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None): """ Context manager that acquires a lock on a directory. This will block until the lock can be acquired, or the timeout time has expired (whichever occurs @@ -2605,6 +2605,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1): the same directory at the time this method is called. Note that this limit is only applied during the current call to this method and does not prevent subsequent calls giving a larger limit. Defaults to 1. + :param name: A string to distinguishes different type of locks in a + directory :raises TypeError: if limit is not an int. :raises ValueError: if limit is less than 1. """ @@ -2614,6 +2616,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1): timeout_class = swift.common.exceptions.LockTimeout mkdirs(directory) lockpath = '%s/.lock' % directory + if name: + lockpath += '-%s' % str(name) fds = [os.open(get_zero_indexed_base_string(lockpath, i), os.O_WRONLY | os.O_CREAT) for i in range(limit)] diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 2ba0516c2b..53957bae6a 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1267,23 +1267,34 @@ class BaseDiskFileManager(object): return None @contextmanager - def replication_lock(self, device): + def replication_lock(self, device, policy, partition): """ A context manager that will lock on the device given, if configured to do so. :param device: name of target device + :param policy: policy targeted by the replication request + :param partition: partition targeted by the replication request :raises ReplicationLockTimeout: If the lock on the device cannot be granted within the configured timeout. """ if self.replication_concurrency_per_device: dev_path = self.get_dev_path(device) + part_path = os.path.join(dev_path, get_data_dir(policy), + str(partition)) + limit_time = time.time() + self.replication_lock_timeout with lock_path( dev_path, timeout=self.replication_lock_timeout, timeout_class=ReplicationLockTimeout, limit=self.replication_concurrency_per_device): - yield True + with lock_path( + part_path, + timeout=limit_time - time.time(), + timeout_class=ReplicationLockTimeout, + limit=1, + name='replication'): + yield True else: yield True @@ -1574,6 +1585,8 @@ class BaseDiskFileManager(object): # This lock is only held by people dealing with the hashes # or the hash invalidations, and we've just removed those. _unlink_if_present(os.path.join(partition_path, ".lock")) + _unlink_if_present(os.path.join(partition_path, + ".lock-replication")) os.rmdir(partition_path) except OSError as err: self.logger.debug("Error cleaning up empty partition: %s", err) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 99a356ea27..68844dd01e 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -158,7 +158,9 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - with self.diskfile_mgr.replication_lock(self.device): + with self.diskfile_mgr.replication_lock(self.device, + self.policy, + self.partition): for data in self.missing_check(): yield data for data in self.updates(): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 88a6ea2e18..f04470d658 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1072,6 +1072,28 @@ class TestUtils(unittest.TestCase): self.assertTrue(exc2 is not None) self.assertTrue(not success) + @with_tempdir + def test_lock_path_name(self, tmpdir): + # With default limit (1), can't take the same named lock twice + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with self.assertRaises(LockTimeout): + with utils.lock_path(tmpdir, 0.1, name='foo'): + success = True + self.assertFalse(success) + # With default limit (1), can take two differently named locks + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with utils.lock_path(tmpdir, 0.1, name='bar'): + success = True + self.assertTrue(success) + # With default limit (1), can take a named lock and the default lock + success = False + with utils.lock_path(tmpdir, 0.1, name='foo'): + with utils.lock_path(tmpdir, 0.1): + success = True + self.assertTrue(success) + def test_normalize_timestamp(self): # Test swift.common.utils.normalize_timestamp self.assertEqual(utils.normalize_timestamp('1253327593.48174'), diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 7396f4b444..0ca1326e29 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -1080,9 +1080,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.df_mgr.replication_concurrency_per_device = 1 self.df_mgr.replication_lock_timeout = 0.1 success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): with self.assertRaises(ReplicationLockTimeout): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True self.assertFalse(success) @@ -1093,9 +1095,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 2 locks must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1103,10 +1107,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 3 locks must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '3'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1119,9 +1126,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 2 locks with replication_concurrency_per_device=2 must succeed success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) @@ -1129,10 +1138,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): # 3 locks with replication_concurrency_per_device=2 must fail success = False - with self.df_mgr.replication_lock(self.existing_device): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '2'): with self.assertRaises(ReplicationLockTimeout): - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '3'): success = True self.assertFalse(success) @@ -1141,14 +1153,29 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.df_mgr.replication_concurrency_per_device = 1 self.df_mgr.replication_lock_timeout = 0.1 success = False - with self.df_mgr.replication_lock(self.existing_device): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): try: - with self.df_mgr.replication_lock(self.existing_device2): + with self.df_mgr.replication_lock(self.existing_device2, + POLICIES.legacy, '2'): success = True except ReplicationLockTimeout as err: self.fail('Unexpected exception: %s' % err) self.assertTrue(success) + def test_replication_lock_same_partition(self): + # Double check settings + self.df_mgr.replication_concurrency_per_device = 2 + self.df_mgr.replication_lock_timeout = 0.1 + success = False + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + with self.assertRaises(ReplicationLockTimeout): + with self.df_mgr.replication_lock(self.existing_device, + POLICIES.legacy, '1'): + success = True + self.assertFalse(success) + def test_missing_splice_warning(self): with mock.patch('swift.common.splice.splice._c_splice', None): self.conf['splice'] = 'yes' diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 88ad9bfe19..90c309ae18 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools import os import shutil import tempfile @@ -131,7 +132,9 @@ class TestReceiver(unittest.TestCase): [':MISSING_CHECK: START', ':MISSING_CHECK: END', ':UPDATES: START', ':UPDATES: END']) self.assertEqual(resp.status_int, 200) - mocked_replication_lock.assert_called_once_with('sda1') + mocked_replication_lock.assert_called_once_with('sda1', + POLICIES.legacy, + '1') def test_Receiver_with_default_storage_policy(self): req = swob.Request.blank( @@ -290,7 +293,7 @@ class TestReceiver(unittest.TestCase): self.controller, req) def test_SSYNC_replication_lock_fail(self): - def _mock(path): + def _mock(path, policy, partition): with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): eventlet.sleep(0.05) with mock.patch.object( @@ -311,6 +314,54 @@ class TestReceiver(unittest.TestCase): 'None/sda1/1 SSYNC LOCK TIMEOUT: 0.01 seconds: ' '/somewhere/sda1') + def test_SSYNC_replication_lock_per_partition(self): + def _concurrent_ssync(path1, path2): + env = {'REQUEST_METHOD': 'SSYNC'} + body = ':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' \ + ':UPDATES: START\r\n:UPDATES: END\r\n' + req1 = swob.Request.blank(path1, environ=env, body=body) + req2 = swob.Request.blank(path2, environ=env, body=body) + + rcvr1 = ssync_receiver.Receiver(self.controller, req1) + rcvr2 = ssync_receiver.Receiver(self.controller, req2) + + body_lines1 = [] + body_lines2 = [] + + for chunk1, chunk2 in itertools.izip_longest(rcvr1(), rcvr2()): + if chunk1 and chunk1.strip(): + body_lines1.append(chunk1.strip()) + if chunk2 and chunk2.strip(): + body_lines2.append(chunk2.strip()) + + return body_lines1, body_lines2 + + self.controller._diskfile_router[POLICIES[0]]\ + .replication_lock_timeout = 0.01 + self.controller._diskfile_router[POLICIES[0]]\ + .replication_concurrency_per_device = 2 + # It should be possible to lock two different partitions + body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/2') + self.assertEqual( + body_lines1, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual( + body_lines2, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + + # It should not be possible to lock the same partition twice + body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/1') + self.assertEqual( + body_lines1, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertRegexpMatches( + ''.join(body_lines2), + "^:ERROR: 0 '0\.0[0-9]+ seconds: " + "/.+/sda1/objects/1/.lock-replication'$") + def test_SSYNC_initial_path(self): with mock.patch.object( self.controller, 'replication_semaphore') as \