Use correct Storage-Policy header for REPLICATE requests

Under some concurrency the object-replicator could potentially send the
wrong X-Backed-Storage-Policy-Index header to it's partner nodes during
replication if there were multiple storage policies on the same node
because of a race where multiple jobs being processed concurrently would
mutate some shared state on the ObjectReplicator instance.

Instead of using shared stated on the ObjectReplicator instance when
mutating the default headers send with REPLICATION requests each job
will copy them into a local where they can safely be updated.

Change-Id: I5522db57af7e308b1f9d4181f14ea14e386a71fd
This commit is contained in:
Clay Gerrard 2015-08-21 18:14:55 -07:00
parent 2d41ff7b45
commit a38f63e1c6
2 changed files with 111 additions and 9 deletions

View File

@ -90,7 +90,7 @@ class ObjectReplicator(Daemon):
self.node_timeout = float(conf.get('node_timeout', 10))
self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
self.headers = {
self.default_headers = {
'Content-Length': '0',
'user-agent': 'object-replicator %s' % os.getpid()}
self.rsync_error_log_line_length = \
@ -270,7 +270,8 @@ class ObjectReplicator(Daemon):
if len(suff) == 3 and isdir(join(path, suff))]
self.replication_count += 1
self.logger.increment('partition.delete.count.%s' % (job['device'],))
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
headers = dict(self.default_headers)
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
failure_devs_info = set()
begin = time.time()
try:
@ -296,7 +297,7 @@ class ObjectReplicator(Daemon):
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=self.headers)
'/' + '-'.join(suffixes), headers=headers)
conn.getresponse().read()
if node['region'] != job['region']:
synced_remote_regions[node['region']] = \
@ -382,7 +383,8 @@ class ObjectReplicator(Daemon):
"""
self.replication_count += 1
self.logger.increment('partition.update.count.%s' % (job['device'],))
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
headers = dict(self.default_headers)
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
target_devs_info = set()
failure_devs_info = set()
begin = time.time()
@ -414,7 +416,7 @@ class ObjectReplicator(Daemon):
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=self.headers).getresponse()
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(_('%(ip)s/%(device)s responded'
' as unmounted'), node)
@ -454,7 +456,7 @@ class ObjectReplicator(Daemon):
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers=self.headers)
headers=headers)
conn.getresponse().read()
if not success:
failure_devs_info.add((node['replication_ip'],

View File

@ -22,12 +22,14 @@ import six.moves.cPickle as pickle
import time
import tempfile
from contextlib import contextmanager, closing
from collections import defaultdict
from errno import ENOENT, ENOTEMPTY, ENOTDIR
from eventlet.green import subprocess
from eventlet import Timeout, tpool
from test.unit import debug_logger, patch_policies
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
mocked_http_conn)
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
storage_directory
@ -76,6 +78,7 @@ class MockProcess(object):
ret_code = None
ret_log = None
check_args = None
captured_log = None
class Stream(object):
@ -99,20 +102,32 @@ class MockProcess(object):
if targ not in args[0]:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
self.captured_info = {
'rsync_args': args[0],
}
self.stdout = self.Stream()
def wait(self):
return next(self.ret_code)
# the _mock_process context manager assures this class attribute is a
# mutable list and takes care of resetting it
rv = next(self.ret_code)
if self.captured_log is not None:
self.captured_info['ret_code'] = rv
self.captured_log.append(self.captured_info)
return rv
@contextmanager
def _mock_process(ret):
captured_log = []
MockProcess.captured_log = captured_log
orig_process = subprocess.Popen
MockProcess.ret_code = (i[0] for i in ret)
MockProcess.ret_log = (i[1] for i in ret)
MockProcess.check_args = (i[2] for i in ret)
object_replicator.subprocess.Popen = MockProcess
yield
yield captured_log
MockProcess.captured_log = None
object_replicator.subprocess.Popen = orig_process
@ -180,8 +195,10 @@ class TestObjectReplicator(unittest.TestCase):
swift_dir=self.testdir, devices=self.devices, mount_check='false',
timeout='300', stats_interval='1', sync_method='rsync')
self._create_replicator()
self.ts = make_timestamp_iter()
def tearDown(self):
self.assertFalse(process_errors)
rmtree(self.testdir, ignore_errors=1)
def test_handoff_replication_setting_warnings(self):
@ -659,6 +676,89 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(jobs[0]['delete'])
self.assertEqual('1', jobs[0]['partition'])
def test_handoffs_first_mode_will_process_all_jobs_after_handoffs(self):
# make a object in the handoff & primary partition
expected_suffix_paths = []
for policy in POLICIES:
# primary
ts = next(self.ts)
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy)
with df.create() as w:
w.write('asdf')
w.put({'X-Timestamp': ts.internal})
w.commit(ts)
expected_suffix_paths.append(os.path.dirname(df._datadir))
# handoff
ts = next(self.ts)
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy)
with df.create() as w:
w.write('asdf')
w.put({'X-Timestamp': ts.internal})
w.commit(ts)
expected_suffix_paths.append(os.path.dirname(df._datadir))
# rsync will be called for all parts we created objects in
process_arg_checker = [
# (return_code, stdout, <each in capture rsync args>)
(0, '', []),
(0, '', []),
(0, '', []), # handoff job "first" policy
(0, '', []),
(0, '', []),
(0, '', []), # handoff job "second" policy
(0, '', []),
(0, '', []), # update job "first" policy
(0, '', []),
(0, '', []), # update job "second" policy
]
# each handoff partition node gets one replicate request for after
# rsync (2 * 3), each primary partition with objects gets two
# replicate requests (pre-flight and post sync) to each of each
# partners (2 * 2 * 2), the 2 remaining empty parts (2 & 3) get a
# pre-flight replicate request per node for each storage policy
# (2 * 2 * 2) - so 6 + 8 + 8 == 22
replicate_responses = [200] * 22
stub_body = pickle.dumps({})
with _mock_process(process_arg_checker) as rsync_log, \
mock.patch('swift.obj.replicator.whataremyips',
side_effect=_ips), \
mocked_http_conn(*replicate_responses,
body=stub_body) as conn_log:
self.replicator.handoffs_first = True
self.replicator.replicate()
# all jobs processed!
self.assertEqual(self.replicator.job_count,
self.replicator.replication_count)
# sanity, all the handoffs suffixes we filled in were rsync'd
found_rsync_suffix_paths = set()
for subprocess_info in rsync_log:
local_path, remote_path = subprocess_info['rsync_args'][-2:]
found_rsync_suffix_paths.add(local_path)
self.assertEqual(set(expected_suffix_paths), found_rsync_suffix_paths)
# sanity, all nodes got replicated
found_replicate_calls = defaultdict(int)
for req in conn_log.requests:
self.assertEqual(req['method'], 'REPLICATE')
found_replicate_key = (
int(req['headers']['X-Backend-Storage-Policy-Index']),
req['path'])
found_replicate_calls[found_replicate_key] += 1
expected_replicate_calls = {
(0, '/sda/1/a83'): 3,
(1, '/sda/1/a83'): 3,
(0, '/sda/0'): 2,
(0, '/sda/0/a83'): 2,
(1, '/sda/0'): 2,
(1, '/sda/0/a83'): 2,
(0, '/sda/2'): 2,
(1, '/sda/2'): 2,
(0, '/sda/3'): 2,
(1, '/sda/3'): 2,
}
self.assertEquals(dict(found_replicate_calls),
expected_replicate_calls)
def test_replicator_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the replicator
rmtree(self.objects)