Merge "Improve object-replicator startup time."

This commit is contained in:
Jenkins 2015-01-22 11:54:01 +00:00 committed by Gerrit Code Review
commit 7def1d2921
2 changed files with 58 additions and 63 deletions

View File

@ -432,14 +432,6 @@ class ObjectReplicator(Daemon):
for partition in os.listdir(obj_path):
try:
job_path = join(obj_path, partition)
if isfile(job_path):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job_path)
os.remove(job_path)
continue
part_nodes = obj_ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
@ -451,8 +443,7 @@ class ObjectReplicator(Daemon):
policy_idx=policy.idx,
partition=partition,
object_ring=obj_ring))
except (ValueError, OSError):
except ValueError:
continue
def collect_jobs(self):
@ -508,6 +499,17 @@ class ObjectReplicator(Daemon):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
try:
if isfile(job['path']):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job['path'])
os.remove(job['path'])
continue
except OSError:
continue
if job['delete']:
self.run_pool.spawn(self.update_deleted, job)
else:

View File

@ -321,64 +321,57 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(jobs[0]['delete'])
self.assertEquals('1', jobs[0]['partition'])
def test_collect_jobs_removes_zbf(self):
"""
After running xfs_repair, a partition directory could become a
zero-byte file. If this happens, collect_jobs() should clean it up and
*not* create a job which will hit an exception as it tries to listdir()
a file.
"""
# Surprise! Partition dir 1 is actually a zero-byte-file
part_1_path = os.path.join(self.objects, '1')
rmtree(part_1_path)
with open(part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path)) # sanity check
part_1_path_1 = os.path.join(self.objects_1, '1')
rmtree(part_1_path_1)
with open(part_1_path_1, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path_1)) # sanity check
def test_replicator_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the replicator
rmtree(self.objects)
rmtree(self.objects_1)
os.mkdir(self.objects)
os.mkdir(self.objects_1)
os.mkdir(os.path.join(self.objects, "burrito"))
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_by_pol_part = {}
for job in jobs:
jobs_by_pol_part[str(job['policy_idx']) + job['partition']] = job
self.assertEquals(len(jobs_to_delete), 0)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['00']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['02']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['03']['nodes']], [3, 1])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['10']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
for part in ['00', '02', '03']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects, part[1:]))
self.assertFalse(os.path.exists(part_1_path))
expected = sorted(self.replicator.logger.log_dict['warning'])
self.assertEqual(len(jobs), 0)
def test_replicator_removes_zbf(self):
# After running xfs_repair, a partition directory could become a
# zero-byte file. If this happens, the replicator should clean it
# up, log something, and move on to the next partition.
# Surprise! Partition dir 1 is actually a zero-byte file.
pol_0_part_1_path = os.path.join(self.objects, '1')
rmtree(pol_0_part_1_path)
with open(pol_0_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(pol_0_part_1_path)) # sanity check
# Policy 1's partition dir 1 is also a zero-byte file.
pol_1_part_1_path = os.path.join(self.objects_1, '1')
rmtree(pol_1_part_1_path)
with open(pol_1_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
# Don't delete things in collect_jobs(); all the stat() calls would
# make replicator startup really slow.
self.replicator.collect_jobs()
self.assertTrue(os.path.exists(pol_0_part_1_path))
self.assertTrue(os.path.exists(pol_1_part_1_path))
# After a replication pass, the files should be gone
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.run_once()
self.assertFalse(os.path.exists(pol_0_part_1_path))
self.assertFalse(os.path.exists(pol_1_part_1_path))
logged_warnings = sorted(self.replicator.logger.log_dict['warning'])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path), {}), expected[1])
# policy 1
for part in ['10', '12', '13']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects_1, part[1:]))
self.assertFalse(os.path.exists(part_1_path_1))
pol_1_part_1_path), {}), logged_warnings[0])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path_1), {}), expected[0])
pol_0_part_1_path), {}), logged_warnings[1])
def test_delete_partition(self):
with mock.patch('swift.obj.replicator.http_connect',