From a38f63e1c6b8b85b1675aa900e239a2e9906811e Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Fri, 21 Aug 2015 18:14:55 -0700 Subject: [PATCH] Use correct Storage-Policy header for REPLICATE requests Under some concurrency the object-replicator could potentially send the wrong X-Backed-Storage-Policy-Index header to it's partner nodes during replication if there were multiple storage policies on the same node because of a race where multiple jobs being processed concurrently would mutate some shared state on the ObjectReplicator instance. Instead of using shared stated on the ObjectReplicator instance when mutating the default headers send with REPLICATION requests each job will copy them into a local where they can safely be updated. Change-Id: I5522db57af7e308b1f9d4181f14ea14e386a71fd --- swift/obj/replicator.py | 14 ++-- test/unit/obj/test_replicator.py | 106 ++++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 9 deletions(-) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 6526b14038..8ae393088b 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -90,7 +90,7 @@ class ObjectReplicator(Daemon): self.node_timeout = float(conf.get('node_timeout', 10)) self.sync_method = getattr(self, conf.get('sync_method') or 'rsync') self.network_chunk_size = int(conf.get('network_chunk_size', 65536)) - self.headers = { + self.default_headers = { 'Content-Length': '0', 'user-agent': 'object-replicator %s' % os.getpid()} self.rsync_error_log_line_length = \ @@ -270,7 +270,8 @@ class ObjectReplicator(Daemon): if len(suff) == 3 and isdir(join(path, suff))] self.replication_count += 1 self.logger.increment('partition.delete.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers = dict(self.default_headers) + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) failure_devs_info = set() begin = time.time() try: @@ -296,7 +297,7 @@ class ObjectReplicator(Daemon): node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', - '/' + '-'.join(suffixes), headers=self.headers) + '/' + '-'.join(suffixes), headers=headers) conn.getresponse().read() if node['region'] != job['region']: synced_remote_regions[node['region']] = \ @@ -382,7 +383,8 @@ class ObjectReplicator(Daemon): """ self.replication_count += 1 self.logger.increment('partition.update.count.%s' % (job['device'],)) - self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers = dict(self.default_headers) + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) target_devs_info = set() failure_devs_info = set() begin = time.time() @@ -414,7 +416,7 @@ class ObjectReplicator(Daemon): resp = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', - '', headers=self.headers).getresponse() + '', headers=headers).getresponse() if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error(_('%(ip)s/%(device)s responded' ' as unmounted'), node) @@ -454,7 +456,7 @@ class ObjectReplicator(Daemon): node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), - headers=self.headers) + headers=headers) conn.getresponse().read() if not success: failure_devs_info.add((node['replication_ip'], diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index e90b2bc137..27f06c9608 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -22,12 +22,14 @@ import six.moves.cPickle as pickle import time import tempfile from contextlib import contextmanager, closing +from collections import defaultdict from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess from eventlet import Timeout, tpool -from test.unit import debug_logger, patch_policies +from test.unit import (debug_logger, patch_policies, make_timestamp_iter, + mocked_http_conn) from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -76,6 +78,7 @@ class MockProcess(object): ret_code = None ret_log = None check_args = None + captured_log = None class Stream(object): @@ -99,20 +102,32 @@ class MockProcess(object): if targ not in args[0]: process_errors.append("Invalid: %s not in %s" % (targ, args)) + self.captured_info = { + 'rsync_args': args[0], + } self.stdout = self.Stream() def wait(self): - return next(self.ret_code) + # the _mock_process context manager assures this class attribute is a + # mutable list and takes care of resetting it + rv = next(self.ret_code) + if self.captured_log is not None: + self.captured_info['ret_code'] = rv + self.captured_log.append(self.captured_info) + return rv @contextmanager def _mock_process(ret): + captured_log = [] + MockProcess.captured_log = captured_log orig_process = subprocess.Popen MockProcess.ret_code = (i[0] for i in ret) MockProcess.ret_log = (i[1] for i in ret) MockProcess.check_args = (i[2] for i in ret) object_replicator.subprocess.Popen = MockProcess - yield + yield captured_log + MockProcess.captured_log = None object_replicator.subprocess.Popen = orig_process @@ -180,8 +195,10 @@ class TestObjectReplicator(unittest.TestCase): swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1', sync_method='rsync') self._create_replicator() + self.ts = make_timestamp_iter() def tearDown(self): + self.assertFalse(process_errors) rmtree(self.testdir, ignore_errors=1) def test_handoff_replication_setting_warnings(self): @@ -659,6 +676,89 @@ class TestObjectReplicator(unittest.TestCase): self.assertTrue(jobs[0]['delete']) self.assertEqual('1', jobs[0]['partition']) + def test_handoffs_first_mode_will_process_all_jobs_after_handoffs(self): + # make a object in the handoff & primary partition + expected_suffix_paths = [] + for policy in POLICIES: + # primary + ts = next(self.ts) + df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy) + with df.create() as w: + w.write('asdf') + w.put({'X-Timestamp': ts.internal}) + w.commit(ts) + expected_suffix_paths.append(os.path.dirname(df._datadir)) + # handoff + ts = next(self.ts) + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy) + with df.create() as w: + w.write('asdf') + w.put({'X-Timestamp': ts.internal}) + w.commit(ts) + expected_suffix_paths.append(os.path.dirname(df._datadir)) + + # rsync will be called for all parts we created objects in + process_arg_checker = [ + # (return_code, stdout, ) + (0, '', []), + (0, '', []), + (0, '', []), # handoff job "first" policy + (0, '', []), + (0, '', []), + (0, '', []), # handoff job "second" policy + (0, '', []), + (0, '', []), # update job "first" policy + (0, '', []), + (0, '', []), # update job "second" policy + ] + # each handoff partition node gets one replicate request for after + # rsync (2 * 3), each primary partition with objects gets two + # replicate requests (pre-flight and post sync) to each of each + # partners (2 * 2 * 2), the 2 remaining empty parts (2 & 3) get a + # pre-flight replicate request per node for each storage policy + # (2 * 2 * 2) - so 6 + 8 + 8 == 22 + replicate_responses = [200] * 22 + stub_body = pickle.dumps({}) + with _mock_process(process_arg_checker) as rsync_log, \ + mock.patch('swift.obj.replicator.whataremyips', + side_effect=_ips), \ + mocked_http_conn(*replicate_responses, + body=stub_body) as conn_log: + self.replicator.handoffs_first = True + self.replicator.replicate() + # all jobs processed! + self.assertEqual(self.replicator.job_count, + self.replicator.replication_count) + + # sanity, all the handoffs suffixes we filled in were rsync'd + found_rsync_suffix_paths = set() + for subprocess_info in rsync_log: + local_path, remote_path = subprocess_info['rsync_args'][-2:] + found_rsync_suffix_paths.add(local_path) + self.assertEqual(set(expected_suffix_paths), found_rsync_suffix_paths) + # sanity, all nodes got replicated + found_replicate_calls = defaultdict(int) + for req in conn_log.requests: + self.assertEqual(req['method'], 'REPLICATE') + found_replicate_key = ( + int(req['headers']['X-Backend-Storage-Policy-Index']), + req['path']) + found_replicate_calls[found_replicate_key] += 1 + expected_replicate_calls = { + (0, '/sda/1/a83'): 3, + (1, '/sda/1/a83'): 3, + (0, '/sda/0'): 2, + (0, '/sda/0/a83'): 2, + (1, '/sda/0'): 2, + (1, '/sda/0/a83'): 2, + (0, '/sda/2'): 2, + (1, '/sda/2'): 2, + (0, '/sda/3'): 2, + (1, '/sda/3'): 2, + } + self.assertEquals(dict(found_replicate_calls), + expected_replicate_calls) + def test_replicator_skips_bogus_partition_dirs(self): # A directory in the wrong place shouldn't crash the replicator rmtree(self.objects)