Merge "Use correct Storage-Policy header for REPLICATE requests"
This commit is contained in:
commit
d703a532cb
@ -90,7 +90,7 @@ class ObjectReplicator(Daemon):
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
|
||||
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
|
||||
self.headers = {
|
||||
self.default_headers = {
|
||||
'Content-Length': '0',
|
||||
'user-agent': 'object-replicator %s' % os.getpid()}
|
||||
self.rsync_error_log_line_length = \
|
||||
@ -271,7 +271,8 @@ class ObjectReplicator(Daemon):
|
||||
if len(suff) == 3 and isdir(join(path, suff))]
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.delete.count.%s' % (job['device'],))
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
headers = dict(self.default_headers)
|
||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
failure_devs_info = set()
|
||||
begin = time.time()
|
||||
try:
|
||||
@ -297,7 +298,7 @@ class ObjectReplicator(Daemon):
|
||||
node['replication_ip'],
|
||||
node['replication_port'],
|
||||
node['device'], job['partition'], 'REPLICATE',
|
||||
'/' + '-'.join(suffixes), headers=self.headers)
|
||||
'/' + '-'.join(suffixes), headers=headers)
|
||||
conn.getresponse().read()
|
||||
if node['region'] != job['region']:
|
||||
synced_remote_regions[node['region']] = \
|
||||
@ -383,7 +384,8 @@ class ObjectReplicator(Daemon):
|
||||
"""
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.update.count.%s' % (job['device'],))
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
headers = dict(self.default_headers)
|
||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
target_devs_info = set()
|
||||
failure_devs_info = set()
|
||||
begin = time.time()
|
||||
@ -415,7 +417,7 @@ class ObjectReplicator(Daemon):
|
||||
resp = http_connect(
|
||||
node['replication_ip'], node['replication_port'],
|
||||
node['device'], job['partition'], 'REPLICATE',
|
||||
'', headers=self.headers).getresponse()
|
||||
'', headers=headers).getresponse()
|
||||
if resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.logger.error(_('%(ip)s/%(device)s responded'
|
||||
' as unmounted'), node)
|
||||
@ -455,7 +457,7 @@ class ObjectReplicator(Daemon):
|
||||
node['replication_ip'], node['replication_port'],
|
||||
node['device'], job['partition'], 'REPLICATE',
|
||||
'/' + '-'.join(suffixes),
|
||||
headers=self.headers)
|
||||
headers=headers)
|
||||
conn.getresponse().read()
|
||||
if not success:
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
|
@ -22,12 +22,14 @@ import six.moves.cPickle as pickle
|
||||
import time
|
||||
import tempfile
|
||||
from contextlib import contextmanager, closing
|
||||
from collections import defaultdict
|
||||
from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import Timeout, tpool
|
||||
|
||||
from test.unit import debug_logger, patch_policies
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
mocked_http_conn)
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
@ -76,6 +78,7 @@ class MockProcess(object):
|
||||
ret_code = None
|
||||
ret_log = None
|
||||
check_args = None
|
||||
captured_log = None
|
||||
|
||||
class Stream(object):
|
||||
|
||||
@ -99,20 +102,32 @@ class MockProcess(object):
|
||||
if targ not in args[0]:
|
||||
process_errors.append("Invalid: %s not in %s" % (targ,
|
||||
args))
|
||||
self.captured_info = {
|
||||
'rsync_args': args[0],
|
||||
}
|
||||
self.stdout = self.Stream()
|
||||
|
||||
def wait(self):
|
||||
return next(self.ret_code)
|
||||
# the _mock_process context manager assures this class attribute is a
|
||||
# mutable list and takes care of resetting it
|
||||
rv = next(self.ret_code)
|
||||
if self.captured_log is not None:
|
||||
self.captured_info['ret_code'] = rv
|
||||
self.captured_log.append(self.captured_info)
|
||||
return rv
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _mock_process(ret):
|
||||
captured_log = []
|
||||
MockProcess.captured_log = captured_log
|
||||
orig_process = subprocess.Popen
|
||||
MockProcess.ret_code = (i[0] for i in ret)
|
||||
MockProcess.ret_log = (i[1] for i in ret)
|
||||
MockProcess.check_args = (i[2] for i in ret)
|
||||
object_replicator.subprocess.Popen = MockProcess
|
||||
yield
|
||||
yield captured_log
|
||||
MockProcess.captured_log = None
|
||||
object_replicator.subprocess.Popen = orig_process
|
||||
|
||||
|
||||
@ -180,8 +195,10 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
||||
timeout='300', stats_interval='1', sync_method='rsync')
|
||||
self._create_replicator()
|
||||
self.ts = make_timestamp_iter()
|
||||
|
||||
def tearDown(self):
|
||||
self.assertFalse(process_errors)
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
|
||||
def test_handoff_replication_setting_warnings(self):
|
||||
@ -670,6 +687,89 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertTrue(jobs[0]['delete'])
|
||||
self.assertEqual('1', jobs[0]['partition'])
|
||||
|
||||
def test_handoffs_first_mode_will_process_all_jobs_after_handoffs(self):
|
||||
# make a object in the handoff & primary partition
|
||||
expected_suffix_paths = []
|
||||
for policy in POLICIES:
|
||||
# primary
|
||||
ts = next(self.ts)
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy)
|
||||
with df.create() as w:
|
||||
w.write('asdf')
|
||||
w.put({'X-Timestamp': ts.internal})
|
||||
w.commit(ts)
|
||||
expected_suffix_paths.append(os.path.dirname(df._datadir))
|
||||
# handoff
|
||||
ts = next(self.ts)
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy)
|
||||
with df.create() as w:
|
||||
w.write('asdf')
|
||||
w.put({'X-Timestamp': ts.internal})
|
||||
w.commit(ts)
|
||||
expected_suffix_paths.append(os.path.dirname(df._datadir))
|
||||
|
||||
# rsync will be called for all parts we created objects in
|
||||
process_arg_checker = [
|
||||
# (return_code, stdout, <each in capture rsync args>)
|
||||
(0, '', []),
|
||||
(0, '', []),
|
||||
(0, '', []), # handoff job "first" policy
|
||||
(0, '', []),
|
||||
(0, '', []),
|
||||
(0, '', []), # handoff job "second" policy
|
||||
(0, '', []),
|
||||
(0, '', []), # update job "first" policy
|
||||
(0, '', []),
|
||||
(0, '', []), # update job "second" policy
|
||||
]
|
||||
# each handoff partition node gets one replicate request for after
|
||||
# rsync (2 * 3), each primary partition with objects gets two
|
||||
# replicate requests (pre-flight and post sync) to each of each
|
||||
# partners (2 * 2 * 2), the 2 remaining empty parts (2 & 3) get a
|
||||
# pre-flight replicate request per node for each storage policy
|
||||
# (2 * 2 * 2) - so 6 + 8 + 8 == 22
|
||||
replicate_responses = [200] * 22
|
||||
stub_body = pickle.dumps({})
|
||||
with _mock_process(process_arg_checker) as rsync_log, \
|
||||
mock.patch('swift.obj.replicator.whataremyips',
|
||||
side_effect=_ips), \
|
||||
mocked_http_conn(*replicate_responses,
|
||||
body=stub_body) as conn_log:
|
||||
self.replicator.handoffs_first = True
|
||||
self.replicator.replicate()
|
||||
# all jobs processed!
|
||||
self.assertEqual(self.replicator.job_count,
|
||||
self.replicator.replication_count)
|
||||
|
||||
# sanity, all the handoffs suffixes we filled in were rsync'd
|
||||
found_rsync_suffix_paths = set()
|
||||
for subprocess_info in rsync_log:
|
||||
local_path, remote_path = subprocess_info['rsync_args'][-2:]
|
||||
found_rsync_suffix_paths.add(local_path)
|
||||
self.assertEqual(set(expected_suffix_paths), found_rsync_suffix_paths)
|
||||
# sanity, all nodes got replicated
|
||||
found_replicate_calls = defaultdict(int)
|
||||
for req in conn_log.requests:
|
||||
self.assertEqual(req['method'], 'REPLICATE')
|
||||
found_replicate_key = (
|
||||
int(req['headers']['X-Backend-Storage-Policy-Index']),
|
||||
req['path'])
|
||||
found_replicate_calls[found_replicate_key] += 1
|
||||
expected_replicate_calls = {
|
||||
(0, '/sda/1/a83'): 3,
|
||||
(1, '/sda/1/a83'): 3,
|
||||
(0, '/sda/0'): 2,
|
||||
(0, '/sda/0/a83'): 2,
|
||||
(1, '/sda/0'): 2,
|
||||
(1, '/sda/0/a83'): 2,
|
||||
(0, '/sda/2'): 2,
|
||||
(1, '/sda/2'): 2,
|
||||
(0, '/sda/3'): 2,
|
||||
(1, '/sda/3'): 2,
|
||||
}
|
||||
self.assertEquals(dict(found_replicate_calls),
|
||||
expected_replicate_calls)
|
||||
|
||||
def test_replicator_skips_bogus_partition_dirs(self):
|
||||
# A directory in the wrong place shouldn't crash the replicator
|
||||
rmtree(self.objects)
|
||||
|
Loading…
x
Reference in New Issue
Block a user