diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 8daeb051f2..1f30a5f682 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -281,6 +281,7 @@ class ObjectReplicator(Daemon): headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) failure_devs_info = set() begin = time.time() + handoff_partition_deleted = False try: responses = [] suffixes = tpool.execute(tpool_get_suffixes, job['path']) @@ -347,8 +348,10 @@ class ObjectReplicator(Daemon): for failure_dev in job['nodes']]) else: self.delete_partition(job['path']) + handoff_partition_deleted = True elif not suffixes: self.delete_partition(job['path']) + handoff_partition_deleted = True except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) finally: @@ -357,6 +360,8 @@ class ObjectReplicator(Daemon): for target_dev in job['nodes']]) self.stats['success'] += len(target_devs_info - failure_devs_info) self._add_failure_stats(failure_devs_info) + if not handoff_partition_deleted: + self.handoffs_remaining += 1 self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) @@ -680,6 +685,7 @@ class ObjectReplicator(Daemon): self.partition_times = [] self.my_replication_ips = self._get_my_replication_ips() self.all_devs_info = set() + self.handoffs_remaining = 0 stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) @@ -705,6 +711,15 @@ class ObjectReplicator(Daemon): for failure_dev in job['nodes']]) self.logger.warning(_('%s is not mounted'), job['device']) continue + if self.handoffs_first and not job['delete']: + # in handoffs first mode, we won't process primary + # partitions until rebalance was successful! + if self.handoffs_remaining: + self.logger.warning(_( + "Handoffs first mode still has handoffs " + "remaining. Aborting current " + "replication pass.")) + break if not self.check_ring(job['policy'].object_ring): self.logger.info(_("Ring change detected. Aborting " "current replication pass.")) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 1821c04506..5523a37451 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -765,6 +765,7 @@ class TestObjectReplicator(unittest.TestCase): # all jobs processed! self.assertEqual(self.replicator.job_count, self.replicator.replication_count) + self.assertFalse(self.replicator.handoffs_remaining) # sanity, all the handoffs suffixes we filled in were rsync'd found_rsync_suffix_paths = set() @@ -795,6 +796,109 @@ class TestObjectReplicator(unittest.TestCase): self.assertEqual(dict(found_replicate_calls), expected_replicate_calls) + def test_handoffs_first_mode_will_abort_if_handoffs_remaining(self): + # make an object in the handoff partition + handoff_suffix_paths = [] + for policy in POLICIES: + ts = next(self.ts) + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy) + with df.create() as w: + w.write('asdf') + w.put({'X-Timestamp': ts.internal}) + w.commit(ts) + handoff_suffix_paths.append(os.path.dirname(df._datadir)) + process_arg_checker = [ + # (return_code, stdout, ) + (0, '', []), + (1, '', []), + (0, '', []), + (0, '', []), + (0, '', []), + (0, '', []), + ] + stub_body = pickle.dumps({}) + with _mock_process(process_arg_checker) as rsync_log, \ + mock.patch('swift.obj.replicator.whataremyips', + side_effect=_ips), \ + mocked_http_conn(*[200] * 5, body=stub_body) as conn_log: + self.replicator.handoffs_first = True + self.replicator.replicate() + # stopped after handoffs! + self.assertEqual(1, self.replicator.handoffs_remaining) + self.assertEqual(8, self.replicator.job_count) + # in addition to the two update_delted jobs as many as "concurrency" + # jobs may have been spawned into the pool before the failed + # update_deleted job incremented handoffs_remaining and caused the + # handoffs_first check to abort the current pass + self.assertLessEqual(self.replicator.replication_count, + 2 + self.replicator.concurrency) + + # sanity, all the handoffs suffixes we filled in were rsync'd + found_rsync_suffix_paths = set() + expected_replicate_requests = set() + for subprocess_info in rsync_log: + local_path, remote_path = subprocess_info['rsync_args'][-2:] + found_rsync_suffix_paths.add(local_path) + if subprocess_info['ret_code'] == 0: + node_ip = remote_path.split(':', 1)[0] + expected_replicate_requests.add(node_ip) + self.assertEqual(set(handoff_suffix_paths), found_rsync_suffix_paths) + # sanity, all successful rsync nodes got REPLCIATE requests + found_replicate_requests = set() + self.assertEqual(5, len(conn_log.requests)) + for req in conn_log.requests: + self.assertEqual(req['method'], 'REPLICATE') + found_replicate_requests.add(req['ip']) + self.assertEqual(expected_replicate_requests, + found_replicate_requests) + + # and at least one partition got removed! + remaining_policies = [] + for path in handoff_suffix_paths: + if os.path.exists(path): + policy = diskfile.extract_policy(path) + remaining_policies.append(policy) + self.assertEqual(len(remaining_policies), 1) + remaining_policy = remaining_policies[0] + + # try again but with handoff_delete allowing for a single failure + with _mock_process(process_arg_checker) as rsync_log, \ + mock.patch('swift.obj.replicator.whataremyips', + side_effect=_ips), \ + mocked_http_conn(*[200] * 14, body=stub_body) as conn_log: + self.replicator.handoff_delete = 2 + self.replicator.replicate() + # all jobs processed! + self.assertEqual(self.replicator.job_count, + self.replicator.replication_count) + self.assertFalse(self.replicator.handoffs_remaining) + # sanity, all parts got replicated + found_replicate_calls = defaultdict(int) + for req in conn_log.requests: + self.assertEqual(req['method'], 'REPLICATE') + found_replicate_key = ( + int(req['headers']['X-Backend-Storage-Policy-Index']), + req['path']) + found_replicate_calls[found_replicate_key] += 1 + expected_replicate_calls = { + (int(remaining_policy), '/sda/1/a83'): 2, + (0, '/sda/0'): 2, + (1, '/sda/0'): 2, + (0, '/sda/2'): 2, + (1, '/sda/2'): 2, + (0, '/sda/3'): 2, + (1, '/sda/3'): 2, + } + self.assertEqual(dict(found_replicate_calls), + expected_replicate_calls) + + # and now all handoff partitions have been rebalanced away! + removed_paths = set() + for path in handoff_suffix_paths: + if not os.path.exists(path): + removed_paths.add(path) + self.assertEqual(removed_paths, set(handoff_suffix_paths)) + def test_replicator_skips_bogus_partition_dirs(self): # A directory in the wrong place shouldn't crash the replicator rmtree(self.objects)