Improve object-replicator startup time.
The object replicator checks each partition directory to ensure it's really a directory and not a zero-byte file. This was happening in collect_jobs(), which is the first thing that the object replicator does. The effect was that, at startup, the object-replicator process would list each "objects" or "objects-N" directory on each object device, then stat() every single thing in there. On devices with lots of partitions on them, this makes the replicator take a long time before it does anything useful. If you have a cluster with a too-high part_power plus some failing disks elsewhere, you can easily get thousands of partition directories on each disk. If you've got 36 disks per node, that turns into a very long wait for the object replicator to do anything. Worse yet, if you add in a configuration management system that pushes new rings every couple hours, the object replicator can spend the vast majority of its time collecting jobs, then only spend a short time doing useful work before the ring changes and it has to start all over again. This commit moves the stat() call (os.path.isfile) to the loop that processes jobs. In a complete pass, the total work done is about the same, but the replicator starts doing useful work much sooner. Change-Id: I5ed4cd09dde514ec7d1e74afe35feaab0cf28a10
This commit is contained in:
parent
cc2f0f4ed6
commit
ba8114a513
@ -432,14 +432,6 @@ class ObjectReplicator(Daemon):
|
||||
for partition in os.listdir(obj_path):
|
||||
try:
|
||||
job_path = join(obj_path, partition)
|
||||
if isfile(job_path):
|
||||
# Clean up any (probably zero-byte) files where a
|
||||
# partition should be.
|
||||
self.logger.warning(
|
||||
'Removing partition directory '
|
||||
'which was a file: %s', job_path)
|
||||
os.remove(job_path)
|
||||
continue
|
||||
part_nodes = obj_ring.get_part_nodes(int(partition))
|
||||
nodes = [node for node in part_nodes
|
||||
if node['id'] != local_dev['id']]
|
||||
@ -451,8 +443,7 @@ class ObjectReplicator(Daemon):
|
||||
policy_idx=policy.idx,
|
||||
partition=partition,
|
||||
object_ring=obj_ring))
|
||||
|
||||
except (ValueError, OSError):
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
def collect_jobs(self):
|
||||
@ -508,6 +499,17 @@ class ObjectReplicator(Daemon):
|
||||
self.logger.info(_("Ring change detected. Aborting "
|
||||
"current replication pass."))
|
||||
return
|
||||
try:
|
||||
if isfile(job['path']):
|
||||
# Clean up any (probably zero-byte) files where a
|
||||
# partition should be.
|
||||
self.logger.warning(
|
||||
'Removing partition directory '
|
||||
'which was a file: %s', job['path'])
|
||||
os.remove(job['path'])
|
||||
continue
|
||||
except OSError:
|
||||
continue
|
||||
if job['delete']:
|
||||
self.run_pool.spawn(self.update_deleted, job)
|
||||
else:
|
||||
|
@ -321,64 +321,57 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertTrue(jobs[0]['delete'])
|
||||
self.assertEquals('1', jobs[0]['partition'])
|
||||
|
||||
def test_collect_jobs_removes_zbf(self):
|
||||
"""
|
||||
After running xfs_repair, a partition directory could become a
|
||||
zero-byte file. If this happens, collect_jobs() should clean it up and
|
||||
*not* create a job which will hit an exception as it tries to listdir()
|
||||
a file.
|
||||
"""
|
||||
# Surprise! Partition dir 1 is actually a zero-byte-file
|
||||
part_1_path = os.path.join(self.objects, '1')
|
||||
rmtree(part_1_path)
|
||||
with open(part_1_path, 'w'):
|
||||
pass
|
||||
self.assertTrue(os.path.isfile(part_1_path)) # sanity check
|
||||
part_1_path_1 = os.path.join(self.objects_1, '1')
|
||||
rmtree(part_1_path_1)
|
||||
with open(part_1_path_1, 'w'):
|
||||
pass
|
||||
self.assertTrue(os.path.isfile(part_1_path_1)) # sanity check
|
||||
def test_replicator_skips_bogus_partition_dirs(self):
|
||||
# A directory in the wrong place shouldn't crash the replicator
|
||||
rmtree(self.objects)
|
||||
rmtree(self.objects_1)
|
||||
os.mkdir(self.objects)
|
||||
os.mkdir(self.objects_1)
|
||||
|
||||
os.mkdir(os.path.join(self.objects, "burrito"))
|
||||
jobs = self.replicator.collect_jobs()
|
||||
jobs_to_delete = [j for j in jobs if j['delete']]
|
||||
jobs_by_pol_part = {}
|
||||
for job in jobs:
|
||||
jobs_by_pol_part[str(job['policy_idx']) + job['partition']] = job
|
||||
self.assertEquals(len(jobs_to_delete), 0)
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['00']['nodes']], [1, 2])
|
||||
self.assertFalse('1' in jobs_by_pol_part)
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['02']['nodes']], [2, 3])
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['03']['nodes']], [3, 1])
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['10']['nodes']], [1, 2])
|
||||
self.assertFalse('1' in jobs_by_pol_part)
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
|
||||
for part in ['00', '02', '03']:
|
||||
for node in jobs_by_pol_part[part]['nodes']:
|
||||
self.assertEquals(node['device'], 'sda')
|
||||
self.assertEquals(jobs_by_pol_part[part]['path'],
|
||||
os.path.join(self.objects, part[1:]))
|
||||
self.assertFalse(os.path.exists(part_1_path))
|
||||
expected = sorted(self.replicator.logger.log_dict['warning'])
|
||||
self.assertEqual(len(jobs), 0)
|
||||
|
||||
def test_replicator_removes_zbf(self):
|
||||
# After running xfs_repair, a partition directory could become a
|
||||
# zero-byte file. If this happens, the replicator should clean it
|
||||
# up, log something, and move on to the next partition.
|
||||
|
||||
# Surprise! Partition dir 1 is actually a zero-byte file.
|
||||
pol_0_part_1_path = os.path.join(self.objects, '1')
|
||||
rmtree(pol_0_part_1_path)
|
||||
with open(pol_0_part_1_path, 'w'):
|
||||
pass
|
||||
self.assertTrue(os.path.isfile(pol_0_part_1_path)) # sanity check
|
||||
|
||||
# Policy 1's partition dir 1 is also a zero-byte file.
|
||||
pol_1_part_1_path = os.path.join(self.objects_1, '1')
|
||||
rmtree(pol_1_part_1_path)
|
||||
with open(pol_1_part_1_path, 'w'):
|
||||
pass
|
||||
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
|
||||
|
||||
# Don't delete things in collect_jobs(); all the stat() calls would
|
||||
# make replicator startup really slow.
|
||||
self.replicator.collect_jobs()
|
||||
self.assertTrue(os.path.exists(pol_0_part_1_path))
|
||||
self.assertTrue(os.path.exists(pol_1_part_1_path))
|
||||
|
||||
# After a replication pass, the files should be gone
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
self.replicator.run_once()
|
||||
|
||||
self.assertFalse(os.path.exists(pol_0_part_1_path))
|
||||
self.assertFalse(os.path.exists(pol_1_part_1_path))
|
||||
|
||||
logged_warnings = sorted(self.replicator.logger.log_dict['warning'])
|
||||
self.assertEquals(
|
||||
(('Removing partition directory which was a file: %s',
|
||||
part_1_path), {}), expected[1])
|
||||
# policy 1
|
||||
for part in ['10', '12', '13']:
|
||||
for node in jobs_by_pol_part[part]['nodes']:
|
||||
self.assertEquals(node['device'], 'sda')
|
||||
self.assertEquals(jobs_by_pol_part[part]['path'],
|
||||
os.path.join(self.objects_1, part[1:]))
|
||||
self.assertFalse(os.path.exists(part_1_path_1))
|
||||
pol_1_part_1_path), {}), logged_warnings[0])
|
||||
self.assertEquals(
|
||||
(('Removing partition directory which was a file: %s',
|
||||
part_1_path_1), {}), expected[0])
|
||||
pol_0_part_1_path), {}), logged_warnings[1])
|
||||
|
||||
def test_delete_partition(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
|
Loading…
x
Reference in New Issue
Block a user