Merge "Fix SSYNC concurrency on partition"
This commit is contained in:
commit
b9d2c08e8d
@ -2585,7 +2585,7 @@ def _get_any_lock(fds):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def lock_path(directory, timeout=10, timeout_class=None, limit=1):
|
||||
def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
|
||||
"""
|
||||
Context manager that acquires a lock on a directory. This will block until
|
||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||
@ -2605,6 +2605,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1):
|
||||
the same directory at the time this method is called. Note that this
|
||||
limit is only applied during the current call to this method and does
|
||||
not prevent subsequent calls giving a larger limit. Defaults to 1.
|
||||
:param name: A string to distinguishes different type of locks in a
|
||||
directory
|
||||
:raises TypeError: if limit is not an int.
|
||||
:raises ValueError: if limit is less than 1.
|
||||
"""
|
||||
@ -2614,6 +2616,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1):
|
||||
timeout_class = swift.common.exceptions.LockTimeout
|
||||
mkdirs(directory)
|
||||
lockpath = '%s/.lock' % directory
|
||||
if name:
|
||||
lockpath += '-%s' % str(name)
|
||||
fds = [os.open(get_zero_indexed_base_string(lockpath, i),
|
||||
os.O_WRONLY | os.O_CREAT)
|
||||
for i in range(limit)]
|
||||
|
@ -1267,23 +1267,34 @@ class BaseDiskFileManager(object):
|
||||
return None
|
||||
|
||||
@contextmanager
|
||||
def replication_lock(self, device):
|
||||
def replication_lock(self, device, policy, partition):
|
||||
"""
|
||||
A context manager that will lock on the device given, if
|
||||
configured to do so.
|
||||
|
||||
:param device: name of target device
|
||||
:param policy: policy targeted by the replication request
|
||||
:param partition: partition targeted by the replication request
|
||||
:raises ReplicationLockTimeout: If the lock on the device
|
||||
cannot be granted within the configured timeout.
|
||||
"""
|
||||
if self.replication_concurrency_per_device:
|
||||
dev_path = self.get_dev_path(device)
|
||||
part_path = os.path.join(dev_path, get_data_dir(policy),
|
||||
str(partition))
|
||||
limit_time = time.time() + self.replication_lock_timeout
|
||||
with lock_path(
|
||||
dev_path,
|
||||
timeout=self.replication_lock_timeout,
|
||||
timeout_class=ReplicationLockTimeout,
|
||||
limit=self.replication_concurrency_per_device):
|
||||
yield True
|
||||
with lock_path(
|
||||
part_path,
|
||||
timeout=limit_time - time.time(),
|
||||
timeout_class=ReplicationLockTimeout,
|
||||
limit=1,
|
||||
name='replication'):
|
||||
yield True
|
||||
else:
|
||||
yield True
|
||||
|
||||
@ -1574,6 +1585,8 @@ class BaseDiskFileManager(object):
|
||||
# This lock is only held by people dealing with the hashes
|
||||
# or the hash invalidations, and we've just removed those.
|
||||
_unlink_if_present(os.path.join(partition_path, ".lock"))
|
||||
_unlink_if_present(os.path.join(partition_path,
|
||||
".lock-replication"))
|
||||
os.rmdir(partition_path)
|
||||
except OSError as err:
|
||||
self.logger.debug("Error cleaning up empty partition: %s", err)
|
||||
|
@ -158,7 +158,9 @@ class Receiver(object):
|
||||
if not self.app.replication_semaphore.acquire(False):
|
||||
raise swob.HTTPServiceUnavailable()
|
||||
try:
|
||||
with self.diskfile_mgr.replication_lock(self.device):
|
||||
with self.diskfile_mgr.replication_lock(self.device,
|
||||
self.policy,
|
||||
self.partition):
|
||||
for data in self.missing_check():
|
||||
yield data
|
||||
for data in self.updates():
|
||||
|
@ -1072,6 +1072,28 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertTrue(exc2 is not None)
|
||||
self.assertTrue(not success)
|
||||
|
||||
@with_tempdir
|
||||
def test_lock_path_name(self, tmpdir):
|
||||
# With default limit (1), can't take the same named lock twice
|
||||
success = False
|
||||
with utils.lock_path(tmpdir, 0.1, name='foo'):
|
||||
with self.assertRaises(LockTimeout):
|
||||
with utils.lock_path(tmpdir, 0.1, name='foo'):
|
||||
success = True
|
||||
self.assertFalse(success)
|
||||
# With default limit (1), can take two differently named locks
|
||||
success = False
|
||||
with utils.lock_path(tmpdir, 0.1, name='foo'):
|
||||
with utils.lock_path(tmpdir, 0.1, name='bar'):
|
||||
success = True
|
||||
self.assertTrue(success)
|
||||
# With default limit (1), can take a named lock and the default lock
|
||||
success = False
|
||||
with utils.lock_path(tmpdir, 0.1, name='foo'):
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
success = True
|
||||
self.assertTrue(success)
|
||||
|
||||
def test_normalize_timestamp(self):
|
||||
# Test swift.common.utils.normalize_timestamp
|
||||
self.assertEqual(utils.normalize_timestamp('1253327593.48174'),
|
||||
|
@ -1080,9 +1080,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.df_mgr.replication_concurrency_per_device = 1
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
with self.assertRaises(ReplicationLockTimeout):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '2'):
|
||||
success = True
|
||||
self.assertFalse(success)
|
||||
|
||||
@ -1093,9 +1095,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
# 2 locks must succeed
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '2'):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
self.fail('Unexpected exception: %s' % err)
|
||||
@ -1103,10 +1107,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
# 3 locks must succeed
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '2'):
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '3'):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
self.fail('Unexpected exception: %s' % err)
|
||||
@ -1119,9 +1126,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
# 2 locks with replication_concurrency_per_device=2 must succeed
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '2'):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
self.fail('Unexpected exception: %s' % err)
|
||||
@ -1129,10 +1138,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
# 3 locks with replication_concurrency_per_device=2 must fail
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '2'):
|
||||
with self.assertRaises(ReplicationLockTimeout):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '3'):
|
||||
success = True
|
||||
self.assertFalse(success)
|
||||
|
||||
@ -1141,14 +1153,29 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.df_mgr.replication_concurrency_per_device = 1
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device2):
|
||||
with self.df_mgr.replication_lock(self.existing_device2,
|
||||
POLICIES.legacy, '2'):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
self.fail('Unexpected exception: %s' % err)
|
||||
self.assertTrue(success)
|
||||
|
||||
def test_replication_lock_same_partition(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_concurrency_per_device = 2
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
success = False
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
with self.assertRaises(ReplicationLockTimeout):
|
||||
with self.df_mgr.replication_lock(self.existing_device,
|
||||
POLICIES.legacy, '1'):
|
||||
success = True
|
||||
self.assertFalse(success)
|
||||
|
||||
def test_missing_splice_warning(self):
|
||||
with mock.patch('swift.common.splice.splice._c_splice', None):
|
||||
self.conf['splice'] = 'yes'
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
@ -131,7 +132,9 @@ class TestReceiver(unittest.TestCase):
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
mocked_replication_lock.assert_called_once_with('sda1')
|
||||
mocked_replication_lock.assert_called_once_with('sda1',
|
||||
POLICIES.legacy,
|
||||
'1')
|
||||
|
||||
def test_Receiver_with_default_storage_policy(self):
|
||||
req = swob.Request.blank(
|
||||
@ -290,7 +293,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.controller, req)
|
||||
|
||||
def test_SSYNC_replication_lock_fail(self):
|
||||
def _mock(path):
|
||||
def _mock(path, policy, partition):
|
||||
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
|
||||
eventlet.sleep(0.05)
|
||||
with mock.patch.object(
|
||||
@ -311,6 +314,54 @@ class TestReceiver(unittest.TestCase):
|
||||
'None/sda1/1 SSYNC LOCK TIMEOUT: 0.01 seconds: '
|
||||
'/somewhere/sda1')
|
||||
|
||||
def test_SSYNC_replication_lock_per_partition(self):
|
||||
def _concurrent_ssync(path1, path2):
|
||||
env = {'REQUEST_METHOD': 'SSYNC'}
|
||||
body = ':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' \
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n'
|
||||
req1 = swob.Request.blank(path1, environ=env, body=body)
|
||||
req2 = swob.Request.blank(path2, environ=env, body=body)
|
||||
|
||||
rcvr1 = ssync_receiver.Receiver(self.controller, req1)
|
||||
rcvr2 = ssync_receiver.Receiver(self.controller, req2)
|
||||
|
||||
body_lines1 = []
|
||||
body_lines2 = []
|
||||
|
||||
for chunk1, chunk2 in itertools.izip_longest(rcvr1(), rcvr2()):
|
||||
if chunk1 and chunk1.strip():
|
||||
body_lines1.append(chunk1.strip())
|
||||
if chunk2 and chunk2.strip():
|
||||
body_lines2.append(chunk2.strip())
|
||||
|
||||
return body_lines1, body_lines2
|
||||
|
||||
self.controller._diskfile_router[POLICIES[0]]\
|
||||
.replication_lock_timeout = 0.01
|
||||
self.controller._diskfile_router[POLICIES[0]]\
|
||||
.replication_concurrency_per_device = 2
|
||||
# It should be possible to lock two different partitions
|
||||
body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/2')
|
||||
self.assertEqual(
|
||||
body_lines1,
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(
|
||||
body_lines2,
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
|
||||
# It should not be possible to lock the same partition twice
|
||||
body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/1')
|
||||
self.assertEqual(
|
||||
body_lines1,
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertRegexpMatches(
|
||||
''.join(body_lines2),
|
||||
"^:ERROR: 0 '0\.0[0-9]+ seconds: "
|
||||
"/.+/sda1/objects/1/.lock-replication'$")
|
||||
|
||||
def test_SSYNC_initial_path(self):
|
||||
with mock.patch.object(
|
||||
self.controller, 'replication_semaphore') as \
|
||||
|
Loading…
Reference in New Issue
Block a user