f56d18e143
Change-Id: I4f98d447bd2ddd8eeb2f4da66d069bd7d6f00dc6
1811 lines
81 KiB
Python
1811 lines
81 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
import os
|
|
import mock
|
|
from gzip import GzipFile
|
|
from shutil import rmtree
|
|
import six.moves.cPickle as pickle
|
|
import time
|
|
import tempfile
|
|
from contextlib import contextmanager, closing
|
|
from collections import defaultdict
|
|
from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
|
|
|
from eventlet.green import subprocess
|
|
from eventlet import Timeout, tpool
|
|
|
|
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
|
mocked_http_conn)
|
|
from swift.common import utils
|
|
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
|
|
storage_directory)
|
|
from swift.common import ring
|
|
from swift.obj import diskfile, replicator as object_replicator
|
|
from swift.common.storage_policy import StoragePolicy, POLICIES
|
|
|
|
|
|
def _ips(*args, **kwargs):
|
|
return ['127.0.0.0']
|
|
|
|
|
|
def mock_http_connect(status):
|
|
|
|
class FakeConn(object):
|
|
|
|
def __init__(self, status, *args, **kwargs):
|
|
self.status = status
|
|
self.reason = 'Fake'
|
|
self.host = args[0]
|
|
self.port = args[1]
|
|
self.method = args[4]
|
|
self.path = args[5]
|
|
self.with_exc = False
|
|
self.headers = kwargs.get('headers', {})
|
|
|
|
def getresponse(self):
|
|
if self.with_exc:
|
|
raise Exception('test')
|
|
return self
|
|
|
|
def getheader(self, header):
|
|
return self.headers[header]
|
|
|
|
def read(self, amt=None):
|
|
return pickle.dumps({})
|
|
|
|
def close(self):
|
|
return
|
|
return lambda *args, **kwargs: FakeConn(status, *args, **kwargs)
|
|
|
|
process_errors = []
|
|
|
|
|
|
class MockProcess(object):
|
|
ret_code = None
|
|
ret_log = None
|
|
check_args = None
|
|
captured_log = None
|
|
|
|
class Stream(object):
|
|
|
|
def read(self):
|
|
return next(MockProcess.ret_log)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
targs = next(MockProcess.check_args)
|
|
for targ in targs:
|
|
# Allow more than 2 candidate targs
|
|
# (e.g. a case that either node is fine when nodes shuffled)
|
|
if isinstance(targ, tuple):
|
|
allowed = False
|
|
for target in targ:
|
|
if target in args[0]:
|
|
allowed = True
|
|
if not allowed:
|
|
process_errors.append("Invalid: %s not in %s" % (targ,
|
|
args))
|
|
else:
|
|
if targ not in args[0]:
|
|
process_errors.append("Invalid: %s not in %s" % (targ,
|
|
args))
|
|
self.captured_info = {
|
|
'rsync_args': args[0],
|
|
}
|
|
self.stdout = self.Stream()
|
|
|
|
def wait(self):
|
|
# the _mock_process context manager assures this class attribute is a
|
|
# mutable list and takes care of resetting it
|
|
rv = next(self.ret_code)
|
|
if self.captured_log is not None:
|
|
self.captured_info['ret_code'] = rv
|
|
self.captured_log.append(self.captured_info)
|
|
return rv
|
|
|
|
|
|
@contextmanager
|
|
def _mock_process(ret):
|
|
captured_log = []
|
|
MockProcess.captured_log = captured_log
|
|
orig_process = subprocess.Popen
|
|
MockProcess.ret_code = (i[0] for i in ret)
|
|
MockProcess.ret_log = (i[1] for i in ret)
|
|
MockProcess.check_args = (i[2] for i in ret)
|
|
object_replicator.subprocess.Popen = MockProcess
|
|
yield captured_log
|
|
MockProcess.captured_log = None
|
|
object_replicator.subprocess.Popen = orig_process
|
|
|
|
|
|
def _create_test_rings(path, devs=None):
|
|
testgz = os.path.join(path, 'object.ring.gz')
|
|
intended_replica2part2dev_id = [
|
|
[0, 1, 2, 3, 4, 5, 6],
|
|
[1, 2, 3, 0, 5, 6, 4],
|
|
[2, 3, 0, 1, 6, 4, 5],
|
|
]
|
|
intended_devs = devs or [
|
|
{'id': 0, 'device': 'sda', 'zone': 0,
|
|
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
|
|
{'id': 1, 'device': 'sda', 'zone': 1,
|
|
'region': 2, 'ip': '127.0.0.1', 'port': 6000},
|
|
{'id': 2, 'device': 'sda', 'zone': 2,
|
|
'region': 3, 'ip': '127.0.0.2', 'port': 6000},
|
|
{'id': 3, 'device': 'sda', 'zone': 4,
|
|
'region': 2, 'ip': '127.0.0.3', 'port': 6000},
|
|
{'id': 4, 'device': 'sda', 'zone': 5,
|
|
'region': 1, 'ip': '127.0.0.4', 'port': 6000},
|
|
{'id': 5, 'device': 'sda', 'zone': 6,
|
|
'region': 3, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
|
|
{'id': 6, 'device': 'sda', 'zone': 7, 'region': 1,
|
|
'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000},
|
|
]
|
|
intended_part_shift = 30
|
|
with closing(GzipFile(testgz, 'wb')) as f:
|
|
pickle.dump(
|
|
ring.RingData(intended_replica2part2dev_id,
|
|
intended_devs, intended_part_shift),
|
|
f)
|
|
|
|
testgz = os.path.join(path, 'object-1.ring.gz')
|
|
with closing(GzipFile(testgz, 'wb')) as f:
|
|
pickle.dump(
|
|
ring.RingData(intended_replica2part2dev_id,
|
|
intended_devs, intended_part_shift),
|
|
f)
|
|
for policy in POLICIES:
|
|
policy.object_ring = None # force reload
|
|
return
|
|
|
|
|
|
@patch_policies([StoragePolicy(0, 'zero', False),
|
|
StoragePolicy(1, 'one', True)])
|
|
class TestObjectReplicator(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
utils.HASH_PATH_SUFFIX = 'endcap'
|
|
utils.HASH_PATH_PREFIX = ''
|
|
# recon cache path
|
|
self.recon_cache = tempfile.mkdtemp()
|
|
rmtree(self.recon_cache, ignore_errors=1)
|
|
os.mkdir(self.recon_cache)
|
|
# Setup a test ring (stolen from common/test_ring.py)
|
|
self.testdir = tempfile.mkdtemp()
|
|
self.devices = os.path.join(self.testdir, 'node')
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
os.mkdir(self.testdir)
|
|
os.mkdir(self.devices)
|
|
|
|
self.objects, self.objects_1, self.parts, self.parts_1 = \
|
|
self._write_disk_data('sda')
|
|
_create_test_rings(self.testdir)
|
|
self.logger = debug_logger('test-replicator')
|
|
self.conf = dict(
|
|
bind_ip=_ips()[0], bind_port=6000,
|
|
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
|
timeout='300', stats_interval='1', sync_method='rsync')
|
|
self._create_replicator()
|
|
self.ts = make_timestamp_iter()
|
|
|
|
def tearDown(self):
|
|
self.assertFalse(process_errors)
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
rmtree(self.recon_cache, ignore_errors=1)
|
|
|
|
def test_handoff_replication_setting_warnings(self):
|
|
conf_tests = [
|
|
# (config, expected_warning)
|
|
({}, False),
|
|
({'handoff_delete': 'auto'}, False),
|
|
({'handoffs_first': 'no'}, False),
|
|
({'handoff_delete': '2'}, True),
|
|
({'handoffs_first': 'yes'}, True),
|
|
({'handoff_delete': '1', 'handoffs_first': 'yes'}, True),
|
|
]
|
|
log_message = 'Handoff only mode is not intended for normal ' \
|
|
'operation, please disable handoffs_first and ' \
|
|
'handoff_delete before the next normal rebalance'
|
|
for config, expected_warning in conf_tests:
|
|
self.logger.clear()
|
|
object_replicator.ObjectReplicator(config, logger=self.logger)
|
|
warning_log_lines = self.logger.get_lines_for_level('warning')
|
|
if expected_warning:
|
|
expected_log_lines = [log_message]
|
|
else:
|
|
expected_log_lines = []
|
|
self.assertEqual(expected_log_lines, warning_log_lines,
|
|
'expected %s != %s for config %r' % (
|
|
expected_log_lines,
|
|
warning_log_lines,
|
|
config,
|
|
))
|
|
|
|
def _write_disk_data(self, disk_name):
|
|
os.mkdir(os.path.join(self.devices, disk_name))
|
|
objects = os.path.join(self.devices, disk_name,
|
|
diskfile.get_data_dir(POLICIES[0]))
|
|
objects_1 = os.path.join(self.devices, disk_name,
|
|
diskfile.get_data_dir(POLICIES[1]))
|
|
os.mkdir(objects)
|
|
os.mkdir(objects_1)
|
|
parts = {}
|
|
parts_1 = {}
|
|
for part in ['0', '1', '2', '3']:
|
|
parts[part] = os.path.join(objects, part)
|
|
os.mkdir(parts[part])
|
|
parts_1[part] = os.path.join(objects_1, part)
|
|
os.mkdir(parts_1[part])
|
|
|
|
return objects, objects_1, parts, parts_1
|
|
|
|
def _create_replicator(self):
|
|
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
|
self.replicator.logger = self.logger
|
|
self.replicator._zero_stats()
|
|
self.replicator.all_devs_info = set()
|
|
self.df_mgr = diskfile.DiskFileManager(self.conf, self.logger)
|
|
|
|
def test_run_once_no_local_device_in_ring(self):
|
|
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
|
bind_ip='1.1.1.1', recon_cache_path=self.recon_cache,
|
|
mount_check='false', timeout='300', stats_interval='1')
|
|
replicator = object_replicator.ObjectReplicator(conf,
|
|
logger=self.logger)
|
|
replicator.run_once()
|
|
expected = [
|
|
"Can't find itself 1.1.1.1 with port 6000 "
|
|
"in ring file, not replicating",
|
|
"Can't find itself 1.1.1.1 with port 6000 "
|
|
"in ring file, not replicating",
|
|
]
|
|
self.assertEqual(expected, self.logger.get_lines_for_level('error'))
|
|
|
|
def test_run_once(self):
|
|
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
|
bind_ip=_ips()[0], recon_cache_path=self.recon_cache,
|
|
mount_check='false', timeout='300', stats_interval='1')
|
|
replicator = object_replicator.ObjectReplicator(conf,
|
|
logger=self.logger)
|
|
was_connector = object_replicator.http_connect
|
|
object_replicator.http_connect = mock_http_connect(200)
|
|
cur_part = '0'
|
|
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
|
policy=POLICIES[0])
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
|
|
process_arg_checker = []
|
|
ring = replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(int(cur_part))
|
|
if node['ip'] not in _ips()]
|
|
rsync_mods = tuple(['%s::object/sda/objects/%s' %
|
|
(node['ip'], cur_part) for node in nodes])
|
|
for node in nodes:
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
|
with _mock_process(process_arg_checker):
|
|
replicator.run_once()
|
|
self.assertFalse(process_errors)
|
|
self.assertFalse(self.logger.get_lines_for_level('error'))
|
|
object_replicator.http_connect = was_connector
|
|
|
|
# policy 1
|
|
def test_run_once_1(self):
|
|
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
|
recon_cache_path=self.recon_cache,
|
|
mount_check='false', timeout='300', stats_interval='1')
|
|
replicator = object_replicator.ObjectReplicator(conf,
|
|
logger=self.logger)
|
|
was_connector = object_replicator.http_connect
|
|
object_replicator.http_connect = mock_http_connect(200)
|
|
cur_part = '0'
|
|
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
|
policy=POLICIES[1])
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects_1, cur_part, data_dir)
|
|
process_arg_checker = []
|
|
ring = replicator.load_object_ring(POLICIES[1])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(int(cur_part))
|
|
if node['ip'] not in _ips()]
|
|
rsync_mods = tuple(['%s::object/sda/objects-1/%s' %
|
|
(node['ip'], cur_part) for node in nodes])
|
|
for node in nodes:
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
|
with _mock_process(process_arg_checker):
|
|
with mock.patch('swift.obj.replicator.whataremyips',
|
|
side_effect=_ips):
|
|
replicator.run_once()
|
|
self.assertFalse(process_errors)
|
|
self.assertFalse(self.logger.get_lines_for_level('error'))
|
|
object_replicator.http_connect = was_connector
|
|
|
|
def test_check_ring(self):
|
|
for pol in POLICIES:
|
|
obj_ring = self.replicator.load_object_ring(pol)
|
|
self.assertTrue(self.replicator.check_ring(obj_ring))
|
|
orig_check = self.replicator.next_check
|
|
self.replicator.next_check = orig_check - 30
|
|
self.assertTrue(self.replicator.check_ring(obj_ring))
|
|
self.replicator.next_check = orig_check
|
|
orig_ring_time = obj_ring._mtime
|
|
obj_ring._mtime = orig_ring_time - 30
|
|
self.assertTrue(self.replicator.check_ring(obj_ring))
|
|
self.replicator.next_check = orig_check - 30
|
|
self.assertFalse(self.replicator.check_ring(obj_ring))
|
|
|
|
def test_collect_jobs_mkdirs_error(self):
|
|
|
|
non_local = {}
|
|
|
|
def blowup_mkdirs(path):
|
|
non_local['path'] = path
|
|
raise OSError('Ow!')
|
|
|
|
with mock.patch.object(object_replicator, 'mkdirs', blowup_mkdirs):
|
|
rmtree(self.objects, ignore_errors=1)
|
|
object_replicator.mkdirs = blowup_mkdirs
|
|
self.replicator.collect_jobs()
|
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
|
'ERROR creating %s: ' % non_local['path']])
|
|
log_args, log_kwargs = self.logger.log_dict['error'][0]
|
|
self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!')
|
|
|
|
def test_collect_jobs(self):
|
|
jobs = self.replicator.collect_jobs()
|
|
jobs_to_delete = [j for j in jobs if j['delete']]
|
|
jobs_by_pol_part = {}
|
|
for job in jobs:
|
|
jobs_by_pol_part[str(int(job['policy'])) + job['partition']] = job
|
|
self.assertEqual(len(jobs_to_delete), 2)
|
|
self.assertTrue('1', jobs_to_delete[0]['partition'])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['00']['nodes']], [1, 2])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['01']['nodes']],
|
|
[1, 2, 3])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['02']['nodes']], [2, 3])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['03']['nodes']], [3, 1])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['10']['nodes']], [1, 2])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['11']['nodes']],
|
|
[1, 2, 3])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
|
|
self.assertEqual(
|
|
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
|
|
for part in ['00', '01', '02', '03']:
|
|
for node in jobs_by_pol_part[part]['nodes']:
|
|
self.assertEqual(node['device'], 'sda')
|
|
self.assertEqual(jobs_by_pol_part[part]['path'],
|
|
os.path.join(self.objects, part[1:]))
|
|
for part in ['10', '11', '12', '13']:
|
|
for node in jobs_by_pol_part[part]['nodes']:
|
|
self.assertEqual(node['device'], 'sda')
|
|
self.assertEqual(jobs_by_pol_part[part]['path'],
|
|
os.path.join(self.objects_1, part[1:]))
|
|
|
|
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
|
def test_collect_jobs_multi_disk(self, mock_shuffle):
|
|
devs = [
|
|
# Two disks on same IP/port
|
|
{'id': 0, 'device': 'sda', 'zone': 0,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
{'id': 1, 'device': 'sdb', 'zone': 1,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
# Two disks on same server, different ports
|
|
{'id': 2, 'device': 'sdc', 'zone': 2,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
|
{'id': 3, 'device': 'sdd', 'zone': 4,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
|
]
|
|
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
|
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
|
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
|
_create_test_rings(self.testdir, devs)
|
|
|
|
jobs = self.replicator.collect_jobs()
|
|
|
|
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
|
|
|
jobs_to_delete = [j for j in jobs if j['delete']]
|
|
self.assertEqual(len(jobs_to_delete), 4)
|
|
self.assertEqual([
|
|
'1', '2', # policy 0; 1 not on sda, 2 not on sdb
|
|
'1', '2', # policy 1; 1 not on sda, 2 not on sdb
|
|
], [j['partition'] for j in jobs_to_delete])
|
|
|
|
jobs_by_pol_part_dev = {}
|
|
for job in jobs:
|
|
# There should be no jobs with a device not in just sda & sdb
|
|
self.assertTrue(job['device'] in ('sda', 'sdb'))
|
|
jobs_by_pol_part_dev[
|
|
str(int(job['policy'])) + job['partition'] + job['device']
|
|
] = job
|
|
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['00sda']['nodes']],
|
|
[1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['00sdb']['nodes']],
|
|
[0, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['01sda']['nodes']],
|
|
[1, 2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['01sdb']['nodes']],
|
|
[2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['02sda']['nodes']],
|
|
[2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['02sdb']['nodes']],
|
|
[2, 3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['03sda']['nodes']],
|
|
[3, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['03sdb']['nodes']],
|
|
[3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['10sda']['nodes']],
|
|
[1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['10sdb']['nodes']],
|
|
[0, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['11sda']['nodes']],
|
|
[1, 2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['11sdb']['nodes']],
|
|
[2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['12sda']['nodes']],
|
|
[2, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['12sdb']['nodes']],
|
|
[2, 3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['13sda']['nodes']],
|
|
[3, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['13sdb']['nodes']],
|
|
[3, 0])
|
|
for part in ['00', '01', '02', '03']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sda']['path'],
|
|
os.path.join(self.objects, part[1:]))
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdb']['path'],
|
|
os.path.join(objects_sdb, part[1:]))
|
|
for part in ['10', '11', '12', '13']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sda']['path'],
|
|
os.path.join(self.objects_1, part[1:]))
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdb']['path'],
|
|
os.path.join(objects_1_sdb, part[1:]))
|
|
|
|
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
|
def test_collect_jobs_multi_disk_diff_ports_normal(self, mock_shuffle):
|
|
# Normally (servers_per_port=0), replication_ip AND replication_port
|
|
# are used to determine local ring device entries. Here we show that
|
|
# with bind_ip='127.0.0.1', bind_port=6000, only "sdc" is local.
|
|
devs = [
|
|
# Two disks on same IP/port
|
|
{'id': 0, 'device': 'sda', 'zone': 0,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
{'id': 1, 'device': 'sdb', 'zone': 1,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
# Two disks on same server, different ports
|
|
{'id': 2, 'device': 'sdc', 'zone': 2,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
|
{'id': 3, 'device': 'sdd', 'zone': 4,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
|
]
|
|
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
|
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
|
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
|
_create_test_rings(self.testdir, devs)
|
|
|
|
self.conf['bind_ip'] = '127.0.0.1'
|
|
self._create_replicator()
|
|
|
|
jobs = self.replicator.collect_jobs()
|
|
|
|
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
|
|
|
jobs_to_delete = [j for j in jobs if j['delete']]
|
|
self.assertEqual(len(jobs_to_delete), 2)
|
|
self.assertEqual([
|
|
'3', # policy 0; 3 not on sdc
|
|
'3', # policy 1; 3 not on sdc
|
|
], [j['partition'] for j in jobs_to_delete])
|
|
|
|
jobs_by_pol_part_dev = {}
|
|
for job in jobs:
|
|
# There should be no jobs with a device not sdc
|
|
self.assertEqual(job['device'], 'sdc')
|
|
jobs_by_pol_part_dev[
|
|
str(int(job['policy'])) + job['partition'] + job['device']
|
|
] = job
|
|
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
|
|
[0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
|
|
[1, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
|
|
[3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
|
|
[3, 0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
|
|
[0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
|
|
[1, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
|
|
[3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
|
|
[3, 0, 1])
|
|
for part in ['00', '01', '02', '03']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
|
os.path.join(objects_sdc, part[1:]))
|
|
for part in ['10', '11', '12', '13']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
|
os.path.join(objects_1_sdc, part[1:]))
|
|
|
|
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
|
def test_collect_jobs_multi_disk_servers_per_port(self, mock_shuffle):
|
|
# Normally (servers_per_port=0), replication_ip AND replication_port
|
|
# are used to determine local ring device entries. Here we show that
|
|
# with servers_per_port > 0 and bind_ip='127.0.0.1', bind_port=6000,
|
|
# then both "sdc" and "sdd" are local.
|
|
devs = [
|
|
# Two disks on same IP/port
|
|
{'id': 0, 'device': 'sda', 'zone': 0,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
{'id': 1, 'device': 'sdb', 'zone': 1,
|
|
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
|
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
|
# Two disks on same server, different ports
|
|
{'id': 2, 'device': 'sdc', 'zone': 2,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
|
{'id': 3, 'device': 'sdd', 'zone': 4,
|
|
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
|
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
|
]
|
|
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
|
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
|
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
|
_create_test_rings(self.testdir, devs)
|
|
|
|
self.conf['bind_ip'] = '127.0.0.1'
|
|
self.conf['servers_per_port'] = 1 # diff port ok
|
|
self._create_replicator()
|
|
|
|
jobs = self.replicator.collect_jobs()
|
|
|
|
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
|
|
|
jobs_to_delete = [j for j in jobs if j['delete']]
|
|
self.assertEqual(len(jobs_to_delete), 4)
|
|
self.assertEqual([
|
|
'3', '0', # policy 0; 3 not on sdc, 0 not on sdd
|
|
'3', '0', # policy 1; 3 not on sdc, 0 not on sdd
|
|
], [j['partition'] for j in jobs_to_delete])
|
|
|
|
jobs_by_pol_part_dev = {}
|
|
for job in jobs:
|
|
# There should be no jobs with a device not in just sdc & sdd
|
|
self.assertTrue(job['device'] in ('sdc', 'sdd'))
|
|
jobs_by_pol_part_dev[
|
|
str(int(job['policy'])) + job['partition'] + job['device']
|
|
] = job
|
|
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
|
|
[0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['00sdd']['nodes']],
|
|
[0, 1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
|
|
[1, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['01sdd']['nodes']],
|
|
[1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
|
|
[3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['02sdd']['nodes']],
|
|
[2, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
|
|
[3, 0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['03sdd']['nodes']],
|
|
[0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
|
|
[0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['10sdd']['nodes']],
|
|
[0, 1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
|
|
[1, 3])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['11sdd']['nodes']],
|
|
[1, 2])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
|
|
[3, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['12sdd']['nodes']],
|
|
[2, 0])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
|
|
[3, 0, 1])
|
|
self.assertEqual([node['id']
|
|
for node in jobs_by_pol_part_dev['13sdd']['nodes']],
|
|
[0, 1])
|
|
for part in ['00', '01', '02', '03']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
|
os.path.join(objects_sdc, part[1:]))
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdd']['path'],
|
|
os.path.join(objects_sdd, part[1:]))
|
|
for part in ['10', '11', '12', '13']:
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
|
os.path.join(objects_1_sdc, part[1:]))
|
|
self.assertEqual(jobs_by_pol_part_dev[part + 'sdd']['path'],
|
|
os.path.join(objects_1_sdd, part[1:]))
|
|
|
|
def test_collect_jobs_handoffs_first(self):
|
|
self.replicator.handoffs_first = True
|
|
jobs = self.replicator.collect_jobs()
|
|
self.assertTrue(jobs[0]['delete'])
|
|
self.assertEqual('1', jobs[0]['partition'])
|
|
|
|
def test_handoffs_first_mode_will_process_all_jobs_after_handoffs(self):
|
|
# make a object in the handoff & primary partition
|
|
expected_suffix_paths = []
|
|
for policy in POLICIES:
|
|
# primary
|
|
ts = next(self.ts)
|
|
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy)
|
|
with df.create() as w:
|
|
w.write('asdf')
|
|
w.put({'X-Timestamp': ts.internal})
|
|
w.commit(ts)
|
|
expected_suffix_paths.append(os.path.dirname(df._datadir))
|
|
# handoff
|
|
ts = next(self.ts)
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy)
|
|
with df.create() as w:
|
|
w.write('asdf')
|
|
w.put({'X-Timestamp': ts.internal})
|
|
w.commit(ts)
|
|
expected_suffix_paths.append(os.path.dirname(df._datadir))
|
|
|
|
# rsync will be called for all parts we created objects in
|
|
process_arg_checker = [
|
|
# (return_code, stdout, <each in capture rsync args>)
|
|
(0, '', []),
|
|
(0, '', []),
|
|
(0, '', []), # handoff job "first" policy
|
|
(0, '', []),
|
|
(0, '', []),
|
|
(0, '', []), # handoff job "second" policy
|
|
(0, '', []),
|
|
(0, '', []), # update job "first" policy
|
|
(0, '', []),
|
|
(0, '', []), # update job "second" policy
|
|
]
|
|
# each handoff partition node gets one replicate request for after
|
|
# rsync (2 * 3), each primary partition with objects gets two
|
|
# replicate requests (pre-flight and post sync) to each of each
|
|
# partners (2 * 2 * 2), the 2 remaining empty parts (2 & 3) get a
|
|
# pre-flight replicate request per node for each storage policy
|
|
# (2 * 2 * 2) - so 6 + 8 + 8 == 22
|
|
replicate_responses = [200] * 22
|
|
stub_body = pickle.dumps({})
|
|
with _mock_process(process_arg_checker) as rsync_log, \
|
|
mock.patch('swift.obj.replicator.whataremyips',
|
|
side_effect=_ips), \
|
|
mocked_http_conn(*replicate_responses,
|
|
body=stub_body) as conn_log:
|
|
self.replicator.handoffs_first = True
|
|
self.replicator.replicate()
|
|
# all jobs processed!
|
|
self.assertEqual(self.replicator.job_count,
|
|
self.replicator.replication_count)
|
|
self.assertFalse(self.replicator.handoffs_remaining)
|
|
|
|
# sanity, all the handoffs suffixes we filled in were rsync'd
|
|
found_rsync_suffix_paths = set()
|
|
for subprocess_info in rsync_log:
|
|
local_path, remote_path = subprocess_info['rsync_args'][-2:]
|
|
found_rsync_suffix_paths.add(local_path)
|
|
self.assertEqual(set(expected_suffix_paths), found_rsync_suffix_paths)
|
|
# sanity, all nodes got replicated
|
|
found_replicate_calls = defaultdict(int)
|
|
for req in conn_log.requests:
|
|
self.assertEqual(req['method'], 'REPLICATE')
|
|
found_replicate_key = (
|
|
int(req['headers']['X-Backend-Storage-Policy-Index']),
|
|
req['path'])
|
|
found_replicate_calls[found_replicate_key] += 1
|
|
expected_replicate_calls = {
|
|
(0, '/sda/1/a83'): 3,
|
|
(1, '/sda/1/a83'): 3,
|
|
(0, '/sda/0'): 2,
|
|
(0, '/sda/0/a83'): 2,
|
|
(1, '/sda/0'): 2,
|
|
(1, '/sda/0/a83'): 2,
|
|
(0, '/sda/2'): 2,
|
|
(1, '/sda/2'): 2,
|
|
(0, '/sda/3'): 2,
|
|
(1, '/sda/3'): 2,
|
|
}
|
|
self.assertEqual(dict(found_replicate_calls),
|
|
expected_replicate_calls)
|
|
|
|
def test_handoffs_first_mode_will_abort_if_handoffs_remaining(self):
|
|
# make an object in the handoff partition
|
|
handoff_suffix_paths = []
|
|
for policy in POLICIES:
|
|
ts = next(self.ts)
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o', policy)
|
|
with df.create() as w:
|
|
w.write('asdf')
|
|
w.put({'X-Timestamp': ts.internal})
|
|
w.commit(ts)
|
|
handoff_suffix_paths.append(os.path.dirname(df._datadir))
|
|
process_arg_checker = [
|
|
# (return_code, stdout, <each in capture rsync args>)
|
|
(0, '', []),
|
|
(1, '', []),
|
|
(0, '', []),
|
|
(0, '', []),
|
|
(0, '', []),
|
|
(0, '', []),
|
|
]
|
|
stub_body = pickle.dumps({})
|
|
with _mock_process(process_arg_checker) as rsync_log, \
|
|
mock.patch('swift.obj.replicator.whataremyips',
|
|
side_effect=_ips), \
|
|
mocked_http_conn(*[200] * 5, body=stub_body) as conn_log:
|
|
self.replicator.handoffs_first = True
|
|
self.replicator.replicate()
|
|
# stopped after handoffs!
|
|
self.assertEqual(1, self.replicator.handoffs_remaining)
|
|
self.assertEqual(8, self.replicator.job_count)
|
|
# in addition to the two update_deleted jobs as many as "concurrency"
|
|
# jobs may have been spawned into the pool before the failed
|
|
# update_deleted job incremented handoffs_remaining and caused the
|
|
# handoffs_first check to abort the current pass
|
|
self.assertLessEqual(self.replicator.replication_count,
|
|
2 + self.replicator.concurrency)
|
|
|
|
# sanity, all the handoffs suffixes we filled in were rsync'd
|
|
found_rsync_suffix_paths = set()
|
|
expected_replicate_requests = set()
|
|
for subprocess_info in rsync_log:
|
|
local_path, remote_path = subprocess_info['rsync_args'][-2:]
|
|
found_rsync_suffix_paths.add(local_path)
|
|
if subprocess_info['ret_code'] == 0:
|
|
node_ip = remote_path.split(':', 1)[0]
|
|
expected_replicate_requests.add(node_ip)
|
|
self.assertEqual(set(handoff_suffix_paths), found_rsync_suffix_paths)
|
|
# sanity, all successful rsync nodes got REPLICATE requests
|
|
found_replicate_requests = set()
|
|
self.assertEqual(5, len(conn_log.requests))
|
|
for req in conn_log.requests:
|
|
self.assertEqual(req['method'], 'REPLICATE')
|
|
found_replicate_requests.add(req['ip'])
|
|
self.assertEqual(expected_replicate_requests,
|
|
found_replicate_requests)
|
|
|
|
# and at least one partition got removed!
|
|
remaining_policies = []
|
|
for path in handoff_suffix_paths:
|
|
if os.path.exists(path):
|
|
policy = diskfile.extract_policy(path)
|
|
remaining_policies.append(policy)
|
|
self.assertEqual(len(remaining_policies), 1)
|
|
remaining_policy = remaining_policies[0]
|
|
|
|
# try again but with handoff_delete allowing for a single failure
|
|
with _mock_process(process_arg_checker) as rsync_log, \
|
|
mock.patch('swift.obj.replicator.whataremyips',
|
|
side_effect=_ips), \
|
|
mocked_http_conn(*[200] * 14, body=stub_body) as conn_log:
|
|
self.replicator.handoff_delete = 2
|
|
self.replicator.replicate()
|
|
# all jobs processed!
|
|
self.assertEqual(self.replicator.job_count,
|
|
self.replicator.replication_count)
|
|
self.assertFalse(self.replicator.handoffs_remaining)
|
|
# sanity, all parts got replicated
|
|
found_replicate_calls = defaultdict(int)
|
|
for req in conn_log.requests:
|
|
self.assertEqual(req['method'], 'REPLICATE')
|
|
found_replicate_key = (
|
|
int(req['headers']['X-Backend-Storage-Policy-Index']),
|
|
req['path'])
|
|
found_replicate_calls[found_replicate_key] += 1
|
|
expected_replicate_calls = {
|
|
(int(remaining_policy), '/sda/1/a83'): 2,
|
|
(0, '/sda/0'): 2,
|
|
(1, '/sda/0'): 2,
|
|
(0, '/sda/2'): 2,
|
|
(1, '/sda/2'): 2,
|
|
(0, '/sda/3'): 2,
|
|
(1, '/sda/3'): 2,
|
|
}
|
|
self.assertEqual(dict(found_replicate_calls),
|
|
expected_replicate_calls)
|
|
|
|
# and now all handoff partitions have been rebalanced away!
|
|
removed_paths = set()
|
|
for path in handoff_suffix_paths:
|
|
if not os.path.exists(path):
|
|
removed_paths.add(path)
|
|
self.assertEqual(removed_paths, set(handoff_suffix_paths))
|
|
|
|
def test_replicator_skips_bogus_partition_dirs(self):
|
|
# A directory in the wrong place shouldn't crash the replicator
|
|
rmtree(self.objects)
|
|
rmtree(self.objects_1)
|
|
os.mkdir(self.objects)
|
|
os.mkdir(self.objects_1)
|
|
|
|
os.mkdir(os.path.join(self.objects, "burrito"))
|
|
jobs = self.replicator.collect_jobs()
|
|
self.assertEqual(len(jobs), 0)
|
|
|
|
def test_replicator_removes_zbf(self):
|
|
# After running xfs_repair, a partition directory could become a
|
|
# zero-byte file. If this happens, the replicator should clean it
|
|
# up, log something, and move on to the next partition.
|
|
|
|
# Surprise! Partition dir 1 is actually a zero-byte file.
|
|
pol_0_part_1_path = os.path.join(self.objects, '1')
|
|
rmtree(pol_0_part_1_path)
|
|
with open(pol_0_part_1_path, 'w'):
|
|
pass
|
|
self.assertTrue(os.path.isfile(pol_0_part_1_path)) # sanity check
|
|
|
|
# Policy 1's partition dir 1 is also a zero-byte file.
|
|
pol_1_part_1_path = os.path.join(self.objects_1, '1')
|
|
rmtree(pol_1_part_1_path)
|
|
with open(pol_1_part_1_path, 'w'):
|
|
pass
|
|
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
|
|
|
|
# Don't delete things in collect_jobs(); all the stat() calls would
|
|
# make replicator startup really slow.
|
|
self.replicator.collect_jobs()
|
|
self.assertTrue(os.path.exists(pol_0_part_1_path))
|
|
self.assertTrue(os.path.exists(pol_1_part_1_path))
|
|
|
|
# After a replication pass, the files should be gone
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.run_once()
|
|
|
|
self.assertFalse(os.path.exists(pol_0_part_1_path))
|
|
self.assertFalse(os.path.exists(pol_1_part_1_path))
|
|
self.assertEqual(
|
|
sorted(self.logger.get_lines_for_level('warning')), [
|
|
('Removing partition directory which was a file: %s'
|
|
% pol_1_part_1_path),
|
|
('Removing partition directory which was a file: %s'
|
|
% pol_0_part_1_path),
|
|
])
|
|
|
|
def test_delete_partition(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_default_sync_method(self):
|
|
self.replicator.conf.pop('sync_method')
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_ssync_single_region(self):
|
|
devs = [
|
|
{'id': 0, 'device': 'sda', 'zone': 0,
|
|
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
|
|
{'id': 1, 'device': 'sda', 'zone': 1,
|
|
'region': 1, 'ip': '127.0.0.1', 'port': 6000},
|
|
{'id': 2, 'device': 'sda', 'zone': 2,
|
|
'region': 1, 'ip': '127.0.0.2', 'port': 6000},
|
|
{'id': 3, 'device': 'sda', 'zone': 4,
|
|
'region': 1, 'ip': '127.0.0.3', 'port': 6000},
|
|
{'id': 4, 'device': 'sda', 'zone': 5,
|
|
'region': 1, 'ip': '127.0.0.4', 'port': 6000},
|
|
{'id': 5, 'device': 'sda', 'zone': 6,
|
|
'region': 1, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
|
|
{'id': 6, 'device': 'sda', 'zone': 7, 'region': 1,
|
|
'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000},
|
|
]
|
|
_create_test_rings(self.testdir, devs=devs)
|
|
self.conf['sync_method'] = 'ssync'
|
|
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
|
self.replicator.logger = debug_logger()
|
|
self.replicator._zero_stats()
|
|
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
ts = normalize_timestamp(time.time())
|
|
f = open(os.path.join(df._datadir, ts + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
def _fake_ssync(node, job, suffixes, **kwargs):
|
|
return True, {ohash: ts}
|
|
|
|
self.replicator.sync_method = _fake_ssync
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_1(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES[1])
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects_1, '1', data_dir)
|
|
part_path = os.path.join(self.objects_1, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[1])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects-1/%s' % (node['ip'], 1)
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_with_failures(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for i, node in enumerate(nodes):
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
if i == 0:
|
|
# force one of the rsync calls to fail
|
|
ret_code = 1
|
|
else:
|
|
ret_code = 0
|
|
process_arg_checker.append(
|
|
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
# The path should still exist
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_with_handoff_delete(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.handoff_delete = 2
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for i, node in enumerate(nodes):
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
if i == 0:
|
|
# force one of the rsync calls to fail
|
|
ret_code = 1
|
|
else:
|
|
ret_code = 0
|
|
process_arg_checker.append(
|
|
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_with_handoff_delete_failures(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.handoff_delete = 2
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for i, node in enumerate(nodes):
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
if i in (0, 1):
|
|
# force two of the rsync calls to fail
|
|
ret_code = 1
|
|
else:
|
|
ret_code = 0
|
|
process_arg_checker.append(
|
|
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
ring = self.replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(1)
|
|
if node['ip'] not in _ips()]
|
|
process_arg_checker = []
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
|
if node['region'] != 1:
|
|
# the rsync calls for other region to fail
|
|
ret_code = 1
|
|
else:
|
|
ret_code = 0
|
|
process_arg_checker.append(
|
|
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
|
with _mock_process(process_arg_checker):
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_partition_override_params(self):
|
|
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate(override_devices=['sdb'])
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate(override_partitions=['9'])
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate(override_devices=['sda'],
|
|
override_partitions=['1'])
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_delete_policy_override_params(self):
|
|
df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
df1 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
|
|
policy=POLICIES[1])
|
|
mkdirs(df0._datadir)
|
|
mkdirs(df1._datadir)
|
|
|
|
pol0_part_path = os.path.join(self.objects, '99')
|
|
pol1_part_path = os.path.join(self.objects_1, '99')
|
|
|
|
# sanity checks
|
|
self.assertTrue(os.access(pol0_part_path, os.F_OK))
|
|
self.assertTrue(os.access(pol1_part_path, os.F_OK))
|
|
|
|
# a bogus policy index doesn't bother the replicator any more than a
|
|
# bogus device or partition does
|
|
self.replicator.run_once(policies='1,2,5')
|
|
|
|
self.assertFalse(os.access(pol1_part_path, os.F_OK))
|
|
self.assertTrue(os.access(pol0_part_path, os.F_OK))
|
|
|
|
def test_delete_partition_ssync(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
ts = normalize_timestamp(time.time())
|
|
f = open(os.path.join(df._datadir, ts + '.data'),
|
|
'wb')
|
|
f.write('0')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
self.call_nums = 0
|
|
self.conf['sync_method'] = 'ssync'
|
|
|
|
def _fake_ssync(node, job, suffixes, **kwargs):
|
|
success = True
|
|
ret_val = {ohash: ts}
|
|
if self.call_nums == 2:
|
|
# ssync should return (True, []) only when the second
|
|
# candidate node has not get the replica yet.
|
|
success = False
|
|
ret_val = {}
|
|
self.call_nums += 1
|
|
return success, ret_val
|
|
|
|
self.replicator.sync_method = _fake_ssync
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate()
|
|
# The file should be deleted at the second replicate call
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate()
|
|
# The partition should be deleted at the third replicate call
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
del self.call_nums
|
|
|
|
def test_delete_partition_ssync_with_sync_failure(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
ts = normalize_timestamp(time.time())
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
|
|
f.write('0')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.call_nums = 0
|
|
self.conf['sync_method'] = 'ssync'
|
|
|
|
def _fake_ssync(node, job, suffixes, **kwags):
|
|
success = False
|
|
ret_val = {}
|
|
if self.call_nums == 2:
|
|
# ssync should return (True, []) only when the second
|
|
# candidate node has not get the replica yet.
|
|
success = True
|
|
ret_val = {ohash: ts}
|
|
self.call_nums += 1
|
|
return success, ret_val
|
|
|
|
self.replicator.sync_method = _fake_ssync
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
del self.call_nums
|
|
|
|
def test_delete_objs_ssync_only_when_in_sync(self):
|
|
self.replicator.logger = debug_logger('test-replicator')
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
ts = normalize_timestamp(time.time())
|
|
f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
|
|
f.write('0')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.call_nums = 0
|
|
self.conf['sync_method'] = 'ssync'
|
|
|
|
in_sync_objs = {}
|
|
|
|
def _fake_ssync(node, job, suffixes, remote_check_objs=None):
|
|
self.call_nums += 1
|
|
if remote_check_objs is None:
|
|
# sync job
|
|
ret_val = {ohash: ts}
|
|
else:
|
|
ret_val = in_sync_objs
|
|
return True, ret_val
|
|
|
|
self.replicator.sync_method = _fake_ssync
|
|
self.replicator.replicate()
|
|
self.assertEqual(3, self.call_nums)
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
del self.call_nums
|
|
|
|
def test_delete_partition_ssync_with_cleanup_failure(self):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.logger = mock_logger = \
|
|
debug_logger('test-replicator')
|
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
ts = normalize_timestamp(time.time())
|
|
f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
|
|
f.write('0')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
|
part_path = os.path.join(self.objects, '1')
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
self.call_nums = 0
|
|
self.conf['sync_method'] = 'ssync'
|
|
|
|
def _fake_ssync(node, job, suffixes, **kwargs):
|
|
success = True
|
|
ret_val = {ohash: ts}
|
|
if self.call_nums == 2:
|
|
# ssync should return (True, []) only when the second
|
|
# candidate node has not get the replica yet.
|
|
success = False
|
|
ret_val = {}
|
|
self.call_nums += 1
|
|
return success, ret_val
|
|
|
|
rmdir_func = os.rmdir
|
|
|
|
def raise_exception_rmdir(exception_class, error_no):
|
|
instance = exception_class()
|
|
instance.errno = error_no
|
|
|
|
def func(directory):
|
|
if directory == suffix_dir_path:
|
|
raise instance
|
|
else:
|
|
rmdir_func(directory)
|
|
|
|
return func
|
|
|
|
self.replicator.sync_method = _fake_ssync
|
|
self.replicator.replicate()
|
|
# The file should still exist
|
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
# Fail with ENOENT
|
|
with mock.patch('os.rmdir',
|
|
raise_exception_rmdir(OSError, ENOENT)):
|
|
self.replicator.replicate()
|
|
self.assertFalse(mock_logger.get_lines_for_level('error'))
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
# Fail with ENOTEMPTY
|
|
with mock.patch('os.rmdir',
|
|
raise_exception_rmdir(OSError, ENOTEMPTY)):
|
|
self.replicator.replicate()
|
|
self.assertFalse(mock_logger.get_lines_for_level('error'))
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
# Fail with ENOTDIR
|
|
with mock.patch('os.rmdir',
|
|
raise_exception_rmdir(OSError, ENOTDIR)):
|
|
self.replicator.replicate()
|
|
self.assertEqual(len(mock_logger.get_lines_for_level('error')), 1)
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
|
|
# Finally we can cleanup everything
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertTrue(os.access(part_path, os.F_OK))
|
|
self.replicator.replicate()
|
|
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
|
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
|
self.assertFalse(os.access(part_path, os.F_OK))
|
|
|
|
def test_run_once_recover_from_failure(self):
|
|
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
|
bind_ip=_ips()[0],
|
|
mount_check='false', timeout='300', stats_interval='1')
|
|
replicator = object_replicator.ObjectReplicator(conf)
|
|
was_connector = object_replicator.http_connect
|
|
try:
|
|
object_replicator.http_connect = mock_http_connect(200)
|
|
# Write some files into '1' and run replicate- they should be moved
|
|
# to the other partitions and then node should get deleted.
|
|
cur_part = '1'
|
|
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
|
|
ring = replicator.load_object_ring(POLICIES[0])
|
|
process_arg_checker = []
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(int(cur_part))
|
|
if node['ip'] not in _ips()]
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
|
|
cur_part)
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
|
self.assertTrue(os.access(os.path.join(self.objects,
|
|
'1', data_dir, ohash),
|
|
os.F_OK))
|
|
with _mock_process(process_arg_checker):
|
|
replicator.run_once()
|
|
self.assertFalse(process_errors)
|
|
for i, result in [('0', True), ('1', False),
|
|
('2', True), ('3', True)]:
|
|
self.assertEqual(os.access(
|
|
os.path.join(self.objects,
|
|
i, diskfile.HASH_FILE),
|
|
os.F_OK), result)
|
|
finally:
|
|
object_replicator.http_connect = was_connector
|
|
|
|
def test_run_once_recover_from_timeout(self):
|
|
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
|
bind_ips=_ips()[0],
|
|
mount_check='false', timeout='300', stats_interval='1')
|
|
replicator = object_replicator.ObjectReplicator(conf)
|
|
was_connector = object_replicator.http_connect
|
|
was_get_hashes = object_replicator.DiskFileManager._get_hashes
|
|
was_execute = tpool.execute
|
|
self.get_hash_count = 0
|
|
try:
|
|
|
|
def fake_get_hashes(*args, **kwargs):
|
|
self.get_hash_count += 1
|
|
if self.get_hash_count == 3:
|
|
# raise timeout on last call to get hashes
|
|
raise Timeout()
|
|
return 2, {'abc': 'def'}
|
|
|
|
def fake_exc(tester, *args, **kwargs):
|
|
if 'Error syncing partition' in args[0]:
|
|
tester.i_failed = True
|
|
|
|
self.i_failed = False
|
|
object_replicator.http_connect = mock_http_connect(200)
|
|
object_replicator.DiskFileManager._get_hashes = fake_get_hashes
|
|
replicator.logger.exception = \
|
|
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
|
|
# Write some files into '1' and run replicate- they should be moved
|
|
# to the other partitions and then node should get deleted.
|
|
cur_part = '1'
|
|
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
mkdirs(df._datadir)
|
|
f = open(os.path.join(df._datadir,
|
|
normalize_timestamp(time.time()) + '.data'),
|
|
'wb')
|
|
f.write('1234567890')
|
|
f.close()
|
|
ohash = hash_path('a', 'c', 'o')
|
|
data_dir = ohash[-3:]
|
|
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
|
|
process_arg_checker = []
|
|
ring = replicator.load_object_ring(POLICIES[0])
|
|
nodes = [node for node in
|
|
ring.get_part_nodes(int(cur_part))
|
|
if node['ip'] not in _ips()]
|
|
|
|
for node in nodes:
|
|
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
|
|
cur_part)
|
|
process_arg_checker.append(
|
|
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
|
self.assertTrue(os.access(os.path.join(self.objects,
|
|
'1', data_dir, ohash),
|
|
os.F_OK))
|
|
with _mock_process(process_arg_checker):
|
|
replicator.run_once()
|
|
self.assertFalse(process_errors)
|
|
self.assertFalse(self.i_failed)
|
|
finally:
|
|
object_replicator.http_connect = was_connector
|
|
object_replicator.DiskFileManager._get_hashes = was_get_hashes
|
|
tpool.execute = was_execute
|
|
|
|
def test_run(self):
|
|
with _mock_process([(0, '')] * 100):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.replicate()
|
|
|
|
def test_run_withlog(self):
|
|
with _mock_process([(0, "stuff in log")] * 100):
|
|
with mock.patch('swift.obj.replicator.http_connect',
|
|
mock_http_connect(200)):
|
|
self.replicator.replicate()
|
|
|
|
def test_sync_just_calls_sync_method(self):
|
|
self.replicator.sync_method = mock.MagicMock()
|
|
self.replicator.sync('node', 'job', 'suffixes')
|
|
self.replicator.sync_method.assert_called_once_with(
|
|
'node', 'job', 'suffixes')
|
|
|
|
@mock.patch('swift.obj.replicator.tpool_reraise', autospec=True)
|
|
@mock.patch('swift.obj.replicator.http_connect', autospec=True)
|
|
def test_update(self, mock_http, mock_tpool_reraise):
|
|
|
|
def set_default(self):
|
|
self.replicator.suffix_count = 0
|
|
self.replicator.suffix_sync = 0
|
|
self.replicator.suffix_hash = 0
|
|
self.replicator.replication_count = 0
|
|
self.replicator.partition_times = []
|
|
|
|
self.headers = {'Content-Length': '0',
|
|
'user-agent': 'object-replicator %s' % os.getpid()}
|
|
self.replicator.logger = mock_logger = mock.MagicMock()
|
|
mock_tpool_reraise.return_value = (0, {})
|
|
|
|
all_jobs = self.replicator.collect_jobs()
|
|
jobs = [job for job in all_jobs if not job['delete']]
|
|
|
|
mock_http.return_value = answer = mock.MagicMock()
|
|
answer.getresponse.return_value = resp = mock.MagicMock()
|
|
# Check uncorrect http_connect with status 507 and
|
|
# count of attempts and call args
|
|
resp.status = 507
|
|
error = '%(ip)s/%(device)s responded as unmounted'
|
|
expect = 'Error syncing partition'
|
|
for job in jobs:
|
|
set_default(self)
|
|
ring = job['policy'].object_ring
|
|
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
|
self.replicator.update(job)
|
|
self.assertTrue(error in mock_logger.error.call_args[0][0])
|
|
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
|
|
self.assertEqual(len(self.replicator.partition_times), 1)
|
|
self.assertEqual(mock_http.call_count, len(ring._devs) - 1)
|
|
reqs = []
|
|
for node in job['nodes']:
|
|
reqs.append(mock.call(node['ip'], node['port'], node['device'],
|
|
job['partition'], 'REPLICATE', '',
|
|
headers=self.headers))
|
|
if job['partition'] == '0':
|
|
self.assertEqual(self.replicator.suffix_hash, 0)
|
|
mock_http.assert_has_calls(reqs, any_order=True)
|
|
mock_http.reset_mock()
|
|
mock_logger.reset_mock()
|
|
|
|
# Check uncorrect http_connect with status 400 != HTTP_OK
|
|
resp.status = 400
|
|
error = 'Invalid response %(resp)s from %(ip)s'
|
|
for job in jobs:
|
|
set_default(self)
|
|
self.replicator.update(job)
|
|
self.assertTrue(error in mock_logger.error.call_args[0][0])
|
|
self.assertEqual(len(self.replicator.partition_times), 1)
|
|
mock_logger.reset_mock()
|
|
|
|
# Check successful http_connection and exception with
|
|
# uncorrect pickle.loads(resp.read())
|
|
resp.status = 200
|
|
expect = 'Error syncing with node:'
|
|
for job in jobs:
|
|
set_default(self)
|
|
self.replicator.update(job)
|
|
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
|
|
self.assertEqual(len(self.replicator.partition_times), 1)
|
|
mock_logger.reset_mock()
|
|
|
|
# Check successful http_connection and correct
|
|
# pickle.loads(resp.read()) for non local node
|
|
resp.status = 200
|
|
local_job = None
|
|
resp.read.return_value = pickle.dumps({})
|
|
for job in jobs:
|
|
set_default(self)
|
|
# limit local job to policy 0 for simplicity
|
|
if job['partition'] == '0' and int(job['policy']) == 0:
|
|
local_job = job.copy()
|
|
continue
|
|
self.replicator.update(job)
|
|
self.assertEqual(mock_logger.exception.call_count, 0)
|
|
self.assertEqual(mock_logger.error.call_count, 0)
|
|
self.assertEqual(len(self.replicator.partition_times), 1)
|
|
self.assertEqual(self.replicator.suffix_hash, 0)
|
|
self.assertEqual(self.replicator.suffix_sync, 0)
|
|
self.assertEqual(self.replicator.suffix_count, 0)
|
|
mock_logger.reset_mock()
|
|
|
|
# Check successful http_connect and sync for local node
|
|
mock_tpool_reraise.return_value = (1, {'a83': 'ba47fd314242ec8c'
|
|
'7efb91f5d57336e4'})
|
|
resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a'
|
|
'ada0f4eee69494ff'})
|
|
set_default(self)
|
|
self.replicator.sync = fake_func = \
|
|
mock.MagicMock(return_value=(True, []))
|
|
self.replicator.update(local_job)
|
|
reqs = []
|
|
for node in local_job['nodes']:
|
|
reqs.append(mock.call(node, local_job, ['a83']))
|
|
fake_func.assert_has_calls(reqs, any_order=True)
|
|
self.assertEqual(fake_func.call_count, 2)
|
|
self.assertEqual(self.replicator.replication_count, 1)
|
|
self.assertEqual(self.replicator.suffix_sync, 2)
|
|
self.assertEqual(self.replicator.suffix_hash, 1)
|
|
self.assertEqual(self.replicator.suffix_count, 1)
|
|
|
|
# Efficient Replication Case
|
|
set_default(self)
|
|
self.replicator.sync = fake_func = \
|
|
mock.MagicMock(return_value=(True, []))
|
|
all_jobs = self.replicator.collect_jobs()
|
|
job = None
|
|
for tmp in all_jobs:
|
|
if tmp['partition'] == '3':
|
|
job = tmp
|
|
break
|
|
# The candidate nodes to replicate (i.e. dev1 and dev3)
|
|
# belong to another region
|
|
self.replicator.update(job)
|
|
self.assertEqual(fake_func.call_count, 1)
|
|
self.assertEqual(self.replicator.replication_count, 1)
|
|
self.assertEqual(self.replicator.suffix_sync, 1)
|
|
self.assertEqual(self.replicator.suffix_hash, 1)
|
|
self.assertEqual(self.replicator.suffix_count, 1)
|
|
|
|
mock_http.reset_mock()
|
|
mock_logger.reset_mock()
|
|
|
|
# test for replication params on policy 0 only
|
|
repl_job = local_job.copy()
|
|
for node in repl_job['nodes']:
|
|
node['replication_ip'] = '127.0.0.11'
|
|
node['replication_port'] = '6011'
|
|
set_default(self)
|
|
# with only one set of headers make sure we specify index 0 here
|
|
# as otherwise it may be different from earlier tests
|
|
self.headers['X-Backend-Storage-Policy-Index'] = 0
|
|
self.replicator.update(repl_job)
|
|
reqs = []
|
|
for node in repl_job['nodes']:
|
|
reqs.append(mock.call(node['replication_ip'],
|
|
node['replication_port'], node['device'],
|
|
repl_job['partition'], 'REPLICATE',
|
|
'', headers=self.headers))
|
|
reqs.append(mock.call(node['replication_ip'],
|
|
node['replication_port'], node['device'],
|
|
repl_job['partition'], 'REPLICATE',
|
|
'/a83', headers=self.headers))
|
|
mock_http.assert_has_calls(reqs, any_order=True)
|
|
|
|
def test_rsync_compress_different_region(self):
|
|
self.assertEqual(self.replicator.sync_method, self.replicator.rsync)
|
|
jobs = self.replicator.collect_jobs()
|
|
_m_rsync = mock.Mock(return_value=0)
|
|
_m_os_path_exists = mock.Mock(return_value=True)
|
|
with mock.patch.object(self.replicator, '_rsync', _m_rsync):
|
|
with mock.patch('os.path.exists', _m_os_path_exists):
|
|
for job in jobs:
|
|
self.assertTrue('region' in job)
|
|
for node in job['nodes']:
|
|
for rsync_compress in (True, False):
|
|
self.replicator.rsync_compress = rsync_compress
|
|
ret = \
|
|
self.replicator.sync(node, job,
|
|
['fake_suffix'])
|
|
self.assertTrue(ret)
|
|
if node['region'] != job['region']:
|
|
if rsync_compress:
|
|
# --compress arg should be passed to rsync
|
|
# binary only when rsync_compress option is
|
|
# enabled AND destination node is in a
|
|
# different region
|
|
self.assertTrue('--compress' in
|
|
_m_rsync.call_args[0][0])
|
|
else:
|
|
self.assertFalse('--compress' in
|
|
_m_rsync.call_args[0][0])
|
|
else:
|
|
self.assertFalse('--compress' in
|
|
_m_rsync.call_args[0][0])
|
|
self.assertEqual(
|
|
_m_os_path_exists.call_args_list[-1][0][0],
|
|
os.path.join(job['path'], 'fake_suffix'))
|
|
self.assertEqual(
|
|
_m_os_path_exists.call_args_list[-2][0][0],
|
|
os.path.join(job['path']))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|