diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index e644978d0a..842a2a859c 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -90,7 +90,7 @@ class ObjectReplicator(Daemon): self.node_timeout = float(conf.get('node_timeout', 10)) self.sync_method = getattr(self, conf.get('sync_method') or 'rsync') self.network_chunk_size = int(conf.get('network_chunk_size', 65536)) - self.headers = { + self.default_headers = { 'Content-Length': '0', 'user-agent': 'object-replicator %s' % os.getpid()} self.rsync_error_log_line_length = \ @@ -271,7 +271,8 @@ class ObjectReplicator(Daemon): if len(suff) == 3 and isdir(join(path, suff))] self.replication_count += 1 self.logger.increment('partition.delete.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers = dict(self.default_headers) + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) failure_devs_info = set() begin = time.time() try: @@ -297,7 +298,7 @@ class ObjectReplicator(Daemon): node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', - '/' + '-'.join(suffixes), headers=self.headers) + '/' + '-'.join(suffixes), headers=headers) conn.getresponse().read() if node['region'] != job['region']: synced_remote_regions[node['region']] = \ @@ -383,7 +384,8 @@ class ObjectReplicator(Daemon): """ self.replication_count += 1 self.logger.increment('partition.update.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers = dict(self.default_headers) + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) target_devs_info = set() failure_devs_info = set() begin = time.time() @@ -415,7 +417,7 @@ class ObjectReplicator(Daemon): resp = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', - '', headers=self.headers).getresponse() + '', headers=headers).getresponse() if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error(_('%(ip)s/%(device)s responded' ' as unmounted'), node) @@ -455,7 +457,7 @@ class ObjectReplicator(Daemon): node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), - headers=self.headers) + headers=headers) conn.getresponse().read() if not success: failure_devs_info.add((node['replication_ip'], diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index b9420981c2..7b5b3c14b2 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -22,12 +22,14 @@ import six.moves.cPickle as pickle import time import tempfile from contextlib import contextmanager, closing +from collections import defaultdict from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess from eventlet import Timeout, tpool -from test.unit import debug_logger, patch_policies +from test.unit import (debug_logger, patch_policies, make_timestamp_iter, + mocked_http_conn) from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -76,6 +78,7 @@ class MockProcess(object): ret_code = None ret_log = None check_args = None + captured_log = None class Stream(object): @@ -99,20 +102,32 @@ class MockProcess(object): if targ not in args[0]: process_errors.append("Invalid: %s not in %s" % (targ, args)) + self.captured_info = { + 'rsync_args': args[0], + } self.stdout = self.Stream() def wait(self): - return next(self.ret_code) + # the _mock_process context manager assures this class attribute is a + # mutable list and takes care of resetting it + rv = next(self.ret_code) + if self.captured_log is not None: + self.captured_info['ret_code'] = rv + self.captured_log.append(self.captured_info) + return rv @contextmanager def _mock_process(ret): + captured_log = [] + MockProcess.captured_log = captured_log orig_process = subprocess.Popen MockProcess.ret_code = (i[0] for i in ret) MockProcess.ret_log = (i[1] for i in ret) MockProcess.check_args = (i[2] for i in ret) object_replicator.subprocess.Popen = MockProcess - yield + yield captured_log + MockProcess.captured_log = None object_replicator.subprocess.Popen = orig_process @@ -180,8 +195,10 @@ class TestObjectReplicator(unittest.TestCase): swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1', sync_method='rsync') self._create_replicator() + self.ts = make_timestamp_iter() def tearDown(self): + self.assertFalse(process_errors) rmtree(self.testdir, ignore_errors=1) def test_handoff_replication_setting_warnings(self): @@ -670,6 +687,89 @@ class TestObjectReplicator(unittest.TestCase): self.assertTrue(jobs[0]['delete']) self.assertEqual('1', jobs[0]['partition']) + def test_handoffs_first_mode_will_process_all_jobs_after_handoffs(self): + # make a object in the handoff & primary partition + expected_suffix_paths = [] + for policy in POLICIES: + # primary + ts = next(self.ts) + df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy) + with df.create() as w: + w.write('asdf') + w.put({'X-Timestamp': ts.internal}) + w.commit(ts) + expected_suffix_paths.append(os.path.dirname(df._datadir)) + # handoff + ts = next(self.ts) + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy) + with df.create() as w: + w.write('asdf') + w.put({'X-Timestamp': ts.internal}) + w.commit(ts) + expected_suffix_paths.append(os.path.dirname(df._datadir)) + + # rsync will be called for all parts we created objects in + process_arg_checker = [ + # (return_code, stdout, ) + (0, '', []), + (0, '', []), + (0, '', []), # handoff job "first" policy + (0, '', []), + (0, '', []), + (0, '', []), # handoff job "second" policy + (0, '', []), + (0, '', []), # update job "first" policy + (0, '', []), + (0, '', []), # update job "second" policy + ] + # each handoff partition node gets one replicate request for after + # rsync (2 * 3), each primary partition with objects gets two + # replicate requests (pre-flight and post sync) to each of each + # partners (2 * 2 * 2), the 2 remaining empty parts (2 & 3) get a + # pre-flight replicate request per node for each storage policy + # (2 * 2 * 2) - so 6 + 8 + 8 == 22 + replicate_responses = [200] * 22 + stub_body = pickle.dumps({}) + with _mock_process(process_arg_checker) as rsync_log, \ + mock.patch('swift.obj.replicator.whataremyips', + side_effect=_ips), \ + mocked_http_conn(*replicate_responses, + body=stub_body) as conn_log: + self.replicator.handoffs_first = True + self.replicator.replicate() + # all jobs processed! + self.assertEqual(self.replicator.job_count, + self.replicator.replication_count) + + # sanity, all the handoffs suffixes we filled in were rsync'd + found_rsync_suffix_paths = set() + for subprocess_info in rsync_log: + local_path, remote_path = subprocess_info['rsync_args'][-2:] + found_rsync_suffix_paths.add(local_path) + self.assertEqual(set(expected_suffix_paths), found_rsync_suffix_paths) + # sanity, all nodes got replicated + found_replicate_calls = defaultdict(int) + for req in conn_log.requests: + self.assertEqual(req['method'], 'REPLICATE') + found_replicate_key = ( + int(req['headers']['X-Backend-Storage-Policy-Index']), + req['path']) + found_replicate_calls[found_replicate_key] += 1 + expected_replicate_calls = { + (0, '/sda/1/a83'): 3, + (1, '/sda/1/a83'): 3, + (0, '/sda/0'): 2, + (0, '/sda/0/a83'): 2, + (1, '/sda/0'): 2, + (1, '/sda/0/a83'): 2, + (0, '/sda/2'): 2, + (1, '/sda/2'): 2, + (0, '/sda/3'): 2, + (1, '/sda/3'): 2, + } + self.assertEquals(dict(found_replicate_calls), + expected_replicate_calls) + def test_replicator_skips_bogus_partition_dirs(self): # A directory in the wrong place shouldn't crash the replicator rmtree(self.objects)