Merge "Fix non-deterministic suffix updates in hashes.pkl"
This commit is contained in:
commit
8737ebe519
@ -1066,6 +1066,7 @@ class BaseDiskFileManager(object):
|
||||
if len(suff) == 3:
|
||||
hashes.setdefault(suff, None)
|
||||
modified = True
|
||||
self.logger.debug('Run listdir on %s', partition_path)
|
||||
hashes.update((suffix, None) for suffix in recalculate)
|
||||
for suffix, hash_ in hashes.items():
|
||||
if not hash_:
|
||||
|
@ -46,6 +46,10 @@ DEFAULT_RSYNC_TIMEOUT = 900
|
||||
hubs.use_hub(get_hub())
|
||||
|
||||
|
||||
def _do_listdir(partition, replication_cycle):
|
||||
return (((partition + replication_cycle) % 10) == 0)
|
||||
|
||||
|
||||
class ObjectReplicator(Daemon):
|
||||
"""
|
||||
Replicate objects.
|
||||
@ -74,6 +78,7 @@ class ObjectReplicator(Daemon):
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
self.next_check = time.time() + self.ring_check_interval
|
||||
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
|
||||
self.replication_cycle = random.randint(0, 9)
|
||||
self.partition_times = []
|
||||
self.interval = int(conf.get('interval') or
|
||||
conf.get('run_pause') or 30)
|
||||
@ -404,7 +409,9 @@ class ObjectReplicator(Daemon):
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
self._diskfile_mgr._get_hashes, job['path'],
|
||||
do_listdir=(self.replication_count % 10) == 0,
|
||||
do_listdir=_do_listdir(
|
||||
int(job['partition']),
|
||||
self.replication_cycle),
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.suffix_hash += hashed
|
||||
self.logger.update_stats('suffix.hashes', hashed)
|
||||
@ -692,6 +699,7 @@ class ObjectReplicator(Daemon):
|
||||
self.suffix_hash = 0
|
||||
self.replication_count = 0
|
||||
self.last_replication_count = -1
|
||||
self.replication_cycle = (self.replication_cycle + 1) % 10
|
||||
self.partition_times = []
|
||||
self.my_replication_ips = self._get_my_replication_ips()
|
||||
self.all_devs_info = set()
|
||||
|
@ -312,11 +312,20 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
for node in nodes:
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
||||
start = replicator.replication_cycle
|
||||
self.assertGreaterEqual(start, 0)
|
||||
self.assertLess(start, 9)
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
self.assertEqual(start + 1, replicator.replication_cycle)
|
||||
self.assertFalse(process_errors)
|
||||
self.assertFalse(self.logger.get_lines_for_level('error'))
|
||||
object_replicator.http_connect = was_connector
|
||||
with _mock_process(process_arg_checker):
|
||||
for cycle in range(1, 10):
|
||||
replicator.run_once()
|
||||
self.assertEqual((start + 1 + cycle) % 10,
|
||||
replicator.replication_cycle)
|
||||
|
||||
# policy 1
|
||||
def test_run_once_1(self):
|
||||
@ -1695,9 +1704,10 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.replicator.sync_method.assert_called_once_with(
|
||||
'node', 'job', 'suffixes')
|
||||
|
||||
@mock.patch('swift.obj.replicator.tpool_reraise', autospec=True)
|
||||
@mock.patch('swift.obj.replicator.tpool_reraise')
|
||||
@mock.patch('swift.obj.replicator.http_connect', autospec=True)
|
||||
def test_update(self, mock_http, mock_tpool_reraise):
|
||||
@mock.patch('swift.obj.replicator._do_listdir')
|
||||
def test_update(self, mock_do_listdir, mock_http, mock_tpool_reraise):
|
||||
|
||||
def set_default(self):
|
||||
self.replicator.suffix_count = 0
|
||||
@ -1715,11 +1725,23 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
|
||||
mock_http.return_value = answer = mock.MagicMock()
|
||||
answer.getresponse.return_value = resp = mock.MagicMock()
|
||||
# Check uncorrect http_connect with status 507 and
|
||||
# Check incorrect http_connect with status 507 and
|
||||
# count of attempts and call args
|
||||
resp.status = 507
|
||||
error = '%(replication_ip)s/%(device)s responded as unmounted'
|
||||
expect = 'Error syncing partition: '
|
||||
expected_listdir_calls = [
|
||||
mock.call(int(job['partition']),
|
||||
self.replicator.replication_cycle)
|
||||
for job in jobs]
|
||||
do_listdir_results = [False, False, True, False, True, False]
|
||||
mock_do_listdir.side_effect = do_listdir_results
|
||||
expected_tpool_calls = [
|
||||
mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'],
|
||||
do_listdir=do_listdir,
|
||||
reclaim_age=self.replicator.reclaim_age)
|
||||
for job, do_listdir in zip(jobs, do_listdir_results)
|
||||
]
|
||||
for job in jobs:
|
||||
set_default(self)
|
||||
ring = job['policy'].object_ring
|
||||
@ -1749,8 +1771,11 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
mock_http.assert_has_calls(reqs, any_order=True)
|
||||
mock_http.reset_mock()
|
||||
self.logger.clear()
|
||||
|
||||
# Check uncorrect http_connect with status 400 != HTTP_OK
|
||||
mock_do_listdir.assert_has_calls(expected_listdir_calls)
|
||||
mock_tpool_reraise.assert_has_calls(expected_tpool_calls)
|
||||
mock_do_listdir.side_effect = None
|
||||
mock_do_listdir.return_value = False
|
||||
# Check incorrect http_connect with status 400 != HTTP_OK
|
||||
resp.status = 400
|
||||
error = 'Invalid response %(resp)s from %(ip)s'
|
||||
for job in jobs:
|
||||
@ -1765,7 +1790,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.logger.clear()
|
||||
|
||||
# Check successful http_connection and exception with
|
||||
# uncorrect pickle.loads(resp.read())
|
||||
# incorrect pickle.loads(resp.read())
|
||||
resp.status = 200
|
||||
expect = 'Error syncing with node: %r: '
|
||||
for job in jobs:
|
||||
@ -1896,6 +1921,29 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
_m_os_path_exists.call_args_list[-2][0][0],
|
||||
os.path.join(job['path']))
|
||||
|
||||
def test_do_listdir(self):
|
||||
# Test if do_listdir is enabled for every 10th partition to rehash
|
||||
# First number is the number of partitions in the job, list entries
|
||||
# are the expected partition numbers per run
|
||||
test_data = {
|
||||
9: [1, 0, 1, 1, 1, 1, 1, 1, 1, 1],
|
||||
29: [3, 2, 3, 3, 3, 3, 3, 3, 3, 3],
|
||||
111: [12, 11, 11, 11, 11, 11, 11, 11, 11, 11]}
|
||||
|
||||
for partitions, expected in test_data.items():
|
||||
seen = []
|
||||
for phase in range(10):
|
||||
invalidated = 0
|
||||
for partition in range(partitions):
|
||||
if object_replicator._do_listdir(partition, phase):
|
||||
seen.append(partition)
|
||||
invalidated += 1
|
||||
# Every 10th partition is seen after each phase
|
||||
self.assertEqual(expected[phase], invalidated)
|
||||
|
||||
# After 10 cycles every partition is seen exactly once
|
||||
self.assertEqual(sorted(range(partitions)), sorted(seen))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user