Make handoffs_first a more useful "mode"

Based on experience using handoffs_first and feedback from other
operators it has become clear that handoffs_first is only used during
periods of problematic cluster behavior (e.g. full disks) when
replication attempts are failing to quickly drain off the partitions
from the nodes which they have been rebalanced from.

In order to focus on the most important work (getting handoff partitions
off the node) handoffs_first mode will abort the current replication
sweep before attempting any primary suffix syncing if any of the handoff
partitions were not removed for any reason - and start over with
replication of handoffs jobs as the highest priority.

Note that handoffs_first being enabled will emit a warning on start up,
even if no handoff jobs fail, because of the negative impact it can have
during normal operations by dog piling on a node that was temporarily
unavailable.

Change-Id: Ia324728d42c606e2f9e7d29b4ab5fcbff6e47aea
This commit is contained in:
Clay Gerrard 2015-08-21 18:15:25 -07:00
parent 2f42dea12b
commit 01410129da
2 changed files with 119 additions and 0 deletions

View File

@ -281,6 +281,7 @@ class ObjectReplicator(Daemon):
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
failure_devs_info = set()
begin = time.time()
handoff_partition_deleted = False
try:
responses = []
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
@ -347,8 +348,10 @@ class ObjectReplicator(Daemon):
for failure_dev in job['nodes']])
else:
self.delete_partition(job['path'])
handoff_partition_deleted = True
elif not suffixes:
self.delete_partition(job['path'])
handoff_partition_deleted = True
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
finally:
@ -357,6 +360,8 @@ class ObjectReplicator(Daemon):
for target_dev in job['nodes']])
self.stats['success'] += len(target_devs_info - failure_devs_info)
self._add_failure_stats(failure_devs_info)
if not handoff_partition_deleted:
self.handoffs_remaining += 1
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.delete.timing', begin)
@ -680,6 +685,7 @@ class ObjectReplicator(Daemon):
self.partition_times = []
self.my_replication_ips = self._get_my_replication_ips()
self.all_devs_info = set()
self.handoffs_remaining = 0
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
@ -705,6 +711,15 @@ class ObjectReplicator(Daemon):
for failure_dev in job['nodes']])
self.logger.warning(_('%s is not mounted'), job['device'])
continue
if self.handoffs_first and not job['delete']:
# in handoffs first mode, we won't process primary
# partitions until rebalance was successful!
if self.handoffs_remaining:
self.logger.warning(_(
"Handoffs first mode still has handoffs "
"remaining. Aborting current "
"replication pass."))
break
if not self.check_ring(job['policy'].object_ring):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))

View File

@ -765,6 +765,7 @@ class TestObjectReplicator(unittest.TestCase):
# all jobs processed!
self.assertEqual(self.replicator.job_count,
self.replicator.replication_count)
self.assertFalse(self.replicator.handoffs_remaining)
# sanity, all the handoffs suffixes we filled in were rsync'd
found_rsync_suffix_paths = set()
@ -795,6 +796,109 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEqual(dict(found_replicate_calls),
expected_replicate_calls)
def test_handoffs_first_mode_will_abort_if_handoffs_remaining(self):
# make an object in the handoff partition
handoff_suffix_paths = []
for policy in POLICIES:
ts = next(self.ts)
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy)
with df.create() as w:
w.write('asdf')
w.put({'X-Timestamp': ts.internal})
w.commit(ts)
handoff_suffix_paths.append(os.path.dirname(df._datadir))
process_arg_checker = [
# (return_code, stdout, <each in capture rsync args>)
(0, '', []),
(1, '', []),
(0, '', []),
(0, '', []),
(0, '', []),
(0, '', []),
]
stub_body = pickle.dumps({})
with _mock_process(process_arg_checker) as rsync_log, \
mock.patch('swift.obj.replicator.whataremyips',
side_effect=_ips), \
mocked_http_conn(*[200] * 5, body=stub_body) as conn_log:
self.replicator.handoffs_first = True
self.replicator.replicate()
# stopped after handoffs!
self.assertEqual(1, self.replicator.handoffs_remaining)
self.assertEqual(8, self.replicator.job_count)
# in addition to the two update_delted jobs as many as "concurrency"
# jobs may have been spawned into the pool before the failed
# update_deleted job incremented handoffs_remaining and caused the
# handoffs_first check to abort the current pass
self.assertLessEqual(self.replicator.replication_count,
2 + self.replicator.concurrency)
# sanity, all the handoffs suffixes we filled in were rsync'd
found_rsync_suffix_paths = set()
expected_replicate_requests = set()
for subprocess_info in rsync_log:
local_path, remote_path = subprocess_info['rsync_args'][-2:]
found_rsync_suffix_paths.add(local_path)
if subprocess_info['ret_code'] == 0:
node_ip = remote_path.split(':', 1)[0]
expected_replicate_requests.add(node_ip)
self.assertEqual(set(handoff_suffix_paths), found_rsync_suffix_paths)
# sanity, all successful rsync nodes got REPLCIATE requests
found_replicate_requests = set()
self.assertEqual(5, len(conn_log.requests))
for req in conn_log.requests:
self.assertEqual(req['method'], 'REPLICATE')
found_replicate_requests.add(req['ip'])
self.assertEqual(expected_replicate_requests,
found_replicate_requests)
# and at least one partition got removed!
remaining_policies = []
for path in handoff_suffix_paths:
if os.path.exists(path):
policy = diskfile.extract_policy(path)
remaining_policies.append(policy)
self.assertEqual(len(remaining_policies), 1)
remaining_policy = remaining_policies[0]
# try again but with handoff_delete allowing for a single failure
with _mock_process(process_arg_checker) as rsync_log, \
mock.patch('swift.obj.replicator.whataremyips',
side_effect=_ips), \
mocked_http_conn(*[200] * 14, body=stub_body) as conn_log:
self.replicator.handoff_delete = 2
self.replicator.replicate()
# all jobs processed!
self.assertEqual(self.replicator.job_count,
self.replicator.replication_count)
self.assertFalse(self.replicator.handoffs_remaining)
# sanity, all parts got replicated
found_replicate_calls = defaultdict(int)
for req in conn_log.requests:
self.assertEqual(req['method'], 'REPLICATE')
found_replicate_key = (
int(req['headers']['X-Backend-Storage-Policy-Index']),
req['path'])
found_replicate_calls[found_replicate_key] += 1
expected_replicate_calls = {
(int(remaining_policy), '/sda/1/a83'): 2,
(0, '/sda/0'): 2,
(1, '/sda/0'): 2,
(0, '/sda/2'): 2,
(1, '/sda/2'): 2,
(0, '/sda/3'): 2,
(1, '/sda/3'): 2,
}
self.assertEqual(dict(found_replicate_calls),
expected_replicate_calls)
# and now all handoff partitions have been rebalanced away!
removed_paths = set()
for path in handoff_suffix_paths:
if not os.path.exists(path):
removed_paths.add(path)
self.assertEqual(removed_paths, set(handoff_suffix_paths))
def test_replicator_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the replicator
rmtree(self.objects)