Merge "Efficient Replication for Distributed Regions"

This commit is contained in:
Jenkins 2015-02-13 00:18:12 +00:00 committed by Gerrit Code Review
commit 8ff2f4378b
4 changed files with 509 additions and 51 deletions

View File

@ -14,7 +14,8 @@
# limitations under the License.
import os
from os.path import isdir, isfile, join
import errno
from os.path import isdir, isfile, join, dirname
import random
import shutil
import time
@ -31,7 +32,7 @@ from swift.common.ring.utils import is_local_device
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, ismount, \
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
tpool_reraise, config_auto_int_value
tpool_reraise, config_auto_int_value, storage_directory
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -95,7 +96,8 @@ class ObjectReplicator(Daemon):
conf.get('handoff_delete', 'auto'), 0)
self._diskfile_mgr = DiskFileManager(conf, self.logger)
def sync(self, node, job, suffixes): # Just exists for doc anchor point
# Just exists for doc anchor point
def sync(self, node, job, suffixes, *args, **kwargs):
"""
Synchronize local suffix directories from a partition with a remote
node.
@ -106,7 +108,7 @@ class ObjectReplicator(Daemon):
:returns: boolean indicating success or failure
"""
return self.sync_method(node, job, suffixes)
return self.sync_method(node, job, suffixes, *args, **kwargs)
def get_object_ring(self, policy_idx):
"""
@ -168,7 +170,7 @@ class ObjectReplicator(Daemon):
sync method in Swift.
"""
if not os.path.exists(job['path']):
return False
return False, set()
args = [
'rsync',
'--recursive',
@ -193,14 +195,15 @@ class ObjectReplicator(Daemon):
args.append(spath)
had_any = True
if not had_any:
return False
return False, set()
data_dir = get_data_dir(job['policy_idx'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
return self._rsync(args) == 0
return self._rsync(args) == 0, set()
def ssync(self, node, job, suffixes):
return ssync_sender.Sender(self, node, job, suffixes)()
def ssync(self, node, job, suffixes, remote_check_objs=None):
return ssync_sender.Sender(
self, node, job, suffixes, remote_check_objs)()
def check_ring(self, object_ring):
"""
@ -233,9 +236,18 @@ class ObjectReplicator(Daemon):
try:
responses = []
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
synced_remote_regions = {}
delete_objs = None
if suffixes:
for node in job['nodes']:
success = self.sync(node, job, suffixes)
kwargs = {}
if node['region'] in synced_remote_regions and \
self.conf.get('sync_method') == 'ssync':
kwargs['remote_check_objs'] = \
synced_remote_regions[node['region']]
# cand_objs is a list of objects for deletion
success, cand_objs = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
conn = http_connect(
@ -244,7 +256,14 @@ class ObjectReplicator(Daemon):
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=self.headers)
conn.getresponse().read()
if node['region'] != job['region'] and cand_objs:
synced_remote_regions[node['region']] = cand_objs
responses.append(success)
for region, cand_objs in synced_remote_regions.iteritems():
if delete_objs is None:
delete_objs = cand_objs
else:
delete_objs = delete_objs.intersection(cand_objs)
if self.handoff_delete:
# delete handoff if we have had handoff_delete successes
delete_handoff = len([resp for resp in responses if resp]) >= \
@ -254,14 +273,37 @@ class ObjectReplicator(Daemon):
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if not suffixes or delete_handoff:
self.logger.info(_("Removing partition: %s"), job['path'])
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
if delete_objs:
self.logger.info(_("Removing %s objecs"),
len(delete_objs))
delete_objs = [
storage_directory(job['obj_path'],
job['partition'],
object_hash)
for object_hash in delete_objs]
self.delete_handoff_paths(delete_objs)
else:
self.logger.info(_("Removing partition: %s"), job['path'])
tpool.execute(
shutil.rmtree, job['path'], ignore_errors=True)
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
finally:
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.delete.timing', begin)
def delete_handoff_paths(self, paths):
for object_path in paths:
tpool.execute(shutil.rmtree, object_path, ignore_errors=True)
suffix_dir = dirname(object_path)
try:
os.rmdir(suffix_dir)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
self.logger.exception(
"Unexpected error trying to cleanup suffix dir:%r",
suffix_dir)
def update(self, job):
"""
High-level method that replicates a single partition.
@ -280,6 +322,8 @@ class ObjectReplicator(Daemon):
self.suffix_hash += hashed
self.logger.update_stats('suffix.hashes', hashed)
attempts_left = len(job['nodes'])
synced_remote_regions = set()
random.shuffle(job['nodes'])
nodes = itertools.chain(
job['nodes'],
job['object_ring'].get_more_nodes(int(job['partition'])))
@ -287,6 +331,10 @@ class ObjectReplicator(Daemon):
# If this throws StopIteration it will be caught way below
node = next(nodes)
attempts_left -= 1
# if we have already synced to this remote region,
# don't sync again on this replication pass
if node['region'] in synced_remote_regions:
continue
try:
with Timeout(self.http_timeout):
resp = http_connect(
@ -320,7 +368,7 @@ class ObjectReplicator(Daemon):
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
self.sync(node, job, suffixes)
success, _junk = self.sync(node, job, suffixes)
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
@ -328,6 +376,9 @@ class ObjectReplicator(Daemon):
'/' + '-'.join(suffixes),
headers=self.headers)
conn.getresponse().read()
# add only remote region when replicate succeeded
if success and node['region'] != job['region']:
synced_remote_regions.add(node['region'])
self.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
except (Exception, Timeout):
@ -450,11 +501,13 @@ class ObjectReplicator(Daemon):
jobs.append(
dict(path=job_path,
device=local_dev['device'],
obj_path=obj_path,
nodes=nodes,
delete=len(nodes) > len(part_nodes) - 1,
policy_idx=policy.idx,
partition=partition,
object_ring=obj_ring))
object_ring=obj_ring,
region=local_dev['region']))
except ValueError:
continue
return jobs

View File

@ -14,6 +14,7 @@
# limitations under the License.
import urllib
from itertools import ifilter
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
@ -28,7 +29,7 @@ class Sender(object):
process is there.
"""
def __init__(self, daemon, node, job, suffixes):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
self.daemon = daemon
self.node = node
self.job = job
@ -37,7 +38,11 @@ class Sender(object):
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
self.send_list = None
self.available_set = set()
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
self.send_list = []
self.failures = 0
@property
@ -45,8 +50,16 @@ class Sender(object):
return int(self.job.get('policy_idx', 0))
def __call__(self):
"""
Perform ssync with remote node.
:returns: a 2-tuple, in the form (success, can_delete_objs).
Success is a boolean, and can_delete_objs is an iterable of strings
representing the hashes which are in sync with the remote node.
"""
if not self.suffixes:
return True
return True, set()
try:
# Double try blocks in case our main error handler fails.
try:
@ -57,9 +70,20 @@ class Sender(object):
# other exceptions will be logged with a full stack trace.
self.connect()
self.missing_check()
self.updates()
if not self.remote_check_objs:
self.updates()
can_delete_obj = self.available_set
else:
# when we are initialized with remote_check_objs we don't
# *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion
can_delete_obj = self.available_set.difference(
self.send_list)
self.disconnect()
return self.failures == 0
if not self.failures:
return True, can_delete_obj
else:
return False, set()
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
@ -85,7 +109,7 @@ class Sender(object):
# would only get called if the above except Exception handler
# failed (bad node or job data).
self.daemon.logger.exception('EXCEPTION in replication.Sender')
return False
return False, set()
def connect(self):
"""
@ -96,7 +120,7 @@ class Sender(object):
self.daemon.conn_timeout, 'connect send'):
self.connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
self.node['replication_port']))
self.connection.putrequest('REPLICATION', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
@ -169,10 +193,14 @@ class Sender(object):
self.daemon.node_timeout, 'missing_check start'):
msg = ':MISSING_CHECK: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for path, object_hash, timestamp in \
self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'],
self.policy_idx, self.suffixes):
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'],
self.policy_idx, self.suffixes)
if self.remote_check_objs:
hash_gen = ifilter(lambda (path, object_hash, timestamp):
object_hash in self.remote_check_objs, hash_gen)
for path, object_hash, timestamp in hash_gen:
self.available_set.add(object_hash)
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
@ -197,7 +225,6 @@ class Sender(object):
elif line:
raise exceptions.ReplicationException(
'Unexpected response: %r' % line[:1024])
self.send_list = []
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'missing_check line wait'):

View File

@ -22,13 +22,15 @@ import cPickle as pickle
import time
import tempfile
from contextlib import contextmanager, closing
from errno import ENOENT, ENOTEMPTY, ENOTDIR
from eventlet.green import subprocess
from eventlet import Timeout, tpool
from test.unit import FakeLogger, patch_policies
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
storage_directory
from swift.common import ring
from swift.obj import diskfile, replicator as object_replicator
from swift.common.storage_policy import StoragePolicy, POLICIES
@ -84,9 +86,20 @@ class MockProcess(object):
def __init__(self, *args, **kwargs):
targs = MockProcess.check_args.next()
for targ in targs:
if targ not in args[0]:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
# Allow more than 2 candidate targs
# (e.g. a case that either node is fine when nodes shuffled)
if isinstance(targ, tuple):
allowed = False
for target in targ:
if target in args[0]:
allowed = True
if not allowed:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
else:
if targ not in args[0]:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
self.stdout = self.Stream()
def wait(self):
@ -112,14 +125,19 @@ def _create_test_rings(path):
[2, 3, 0, 1, 6, 4, 5],
]
intended_devs = [
{'id': 0, 'device': 'sda', 'zone': 0, 'ip': '127.0.0.0', 'port': 6000},
{'id': 1, 'device': 'sda', 'zone': 1, 'ip': '127.0.0.1', 'port': 6000},
{'id': 2, 'device': 'sda', 'zone': 2, 'ip': '127.0.0.2', 'port': 6000},
{'id': 3, 'device': 'sda', 'zone': 4, 'ip': '127.0.0.3', 'port': 6000},
{'id': 4, 'device': 'sda', 'zone': 5, 'ip': '127.0.0.4', 'port': 6000},
{'id': 0, 'device': 'sda', 'zone': 0,
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
{'id': 1, 'device': 'sda', 'zone': 1,
'region': 2, 'ip': '127.0.0.1', 'port': 6000},
{'id': 2, 'device': 'sda', 'zone': 2,
'region': 1, 'ip': '127.0.0.2', 'port': 6000},
{'id': 3, 'device': 'sda', 'zone': 4,
'region': 2, 'ip': '127.0.0.3', 'port': 6000},
{'id': 4, 'device': 'sda', 'zone': 5,
'region': 1, 'ip': '127.0.0.4', 'port': 6000},
{'id': 5, 'device': 'sda', 'zone': 6,
'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
{'id': 6, 'device': 'sda', 'zone': 7,
'region': 2, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
{'id': 6, 'device': 'sda', 'zone': 7, 'region': 1,
'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000},
]
intended_part_shift = 30
@ -200,10 +218,11 @@ class TestObjectReplicator(unittest.TestCase):
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
rsync_mods = tuple(['%s::object/sda/objects/%s' %
(node['ip'], cur_part) for node in nodes])
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
(0, '', ['rsync', whole_path_from, rsync_mods]))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
@ -233,10 +252,11 @@ class TestObjectReplicator(unittest.TestCase):
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
rsync_mods = tuple(['%s::object/sda/objects-1/%s' %
(node['ip'], cur_part) for node in nodes])
for node in nodes:
rsync_mod = '%s::object/sda/objects-1/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
(0, '', ['rsync', whole_path_from, rsync_mods]))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
@ -530,6 +550,40 @@ class TestObjectReplicator(unittest.TestCase):
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
if node['region'] != 1:
# the rsync calls for other region to fail
ret_code = 1
else:
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
def test_delete_partition_override_params(self):
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
mkdirs(df._datadir)
@ -564,6 +618,190 @@ class TestObjectReplicator(unittest.TestCase):
self.assertFalse(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.access(pol0_part_path, os.F_OK))
def test_delete_partition_ssync(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
def _fake_ssync(node, job, suffixes, **kwargs):
success = True
ret_val = [whole_path_from]
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = False
ret_val = []
self.call_nums += 1
return success, set(ret_val)
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
# The file should be deleted at the second replicate call
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
# The partition should be deleted at the third replicate call
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertFalse(os.access(part_path, os.F_OK))
del self.call_nums
def test_delete_partition_ssync_with_sync_failure(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
def _fake_ssync(node, job, suffixes):
success = False
ret_val = []
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = True
ret_val = [whole_path_from]
self.call_nums += 1
return success, set(ret_val)
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
del self.call_nums
def test_delete_partition_ssync_with_cleanup_failure(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.logger = mock_logger = mock.MagicMock()
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
def _fake_ssync(node, job, suffixes, **kwargs):
success = True
ret_val = [whole_path_from]
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = False
ret_val = []
self.call_nums += 1
return success, set(ret_val)
rmdir_func = os.rmdir
def raise_exception_rmdir(exception_class, error_no):
instance = exception_class()
instance.errno = error_no
def func(directory):
if directory == suffix_dir_path:
raise instance
else:
rmdir_func(directory)
return func
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
# Fail with ENOENT
with mock.patch('os.rmdir',
raise_exception_rmdir(OSError, ENOENT)):
self.replicator.replicate()
self.assertEquals(mock_logger.exception.call_count, 0)
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
# Fail with ENOTEMPTY
with mock.patch('os.rmdir',
raise_exception_rmdir(OSError, ENOTEMPTY)):
self.replicator.replicate()
self.assertEquals(mock_logger.exception.call_count, 0)
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
# Fail with ENOTDIR
with mock.patch('os.rmdir',
raise_exception_rmdir(OSError, ENOTDIR)):
self.replicator.replicate()
self.assertEquals(mock_logger.exception.call_count, 1)
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
# Finally we can cleanup everything
self.replicator.replicate()
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertFalse(os.access(part_path, os.F_OK))
def test_run_once_recover_from_failure(self):
conf = dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1')
@ -781,7 +1019,8 @@ class TestObjectReplicator(unittest.TestCase):
resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a'
'ada0f4eee69494ff'})
set_default(self)
self.replicator.sync = fake_func = mock.MagicMock()
self.replicator.sync = fake_func = \
mock.MagicMock(return_value=(True, []))
self.replicator.update(local_job)
reqs = []
for node in local_job['nodes']:
@ -792,6 +1031,26 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(self.replicator.suffix_sync, 2)
self.assertEquals(self.replicator.suffix_hash, 1)
self.assertEquals(self.replicator.suffix_count, 1)
# Efficient Replication Case
set_default(self)
self.replicator.sync = fake_func = \
mock.MagicMock(return_value=(True, []))
all_jobs = self.replicator.collect_jobs()
job = None
for tmp in all_jobs:
if tmp['partition'] == '3':
job = tmp
break
# The candidate nodes to replicate (i.e. dev1 and dev3)
# belong to another region
self.replicator.update(job)
self.assertEquals(fake_func.call_count, 1)
self.assertEquals(self.replicator.replication_count, 1)
self.assertEquals(self.replicator.suffix_sync, 1)
self.assertEquals(self.replicator.suffix_hash, 1)
self.assertEquals(self.replicator.suffix_count, 1)
mock_http.reset_mock()
mock_logger.reset_mock()

View File

@ -137,7 +137,9 @@ class TestSender(unittest.TestCase):
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
@ -154,7 +156,9 @@ class TestSender(unittest.TestCase):
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
@ -167,7 +171,9 @@ class TestSender(unittest.TestCase):
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.exception.mock_calls[0]
self.assertEqual(
call[1],
@ -181,7 +187,9 @@ class TestSender(unittest.TestCase):
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
self.replicator.logger.exception.assert_called_once_with(
'EXCEPTION in replication.Sender')
@ -191,7 +199,9 @@ class TestSender(unittest.TestCase):
self.sender.missing_check = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
self.assertTrue(self.sender())
success, candidates = self.sender()
self.assertTrue(success)
self.assertEquals(candidates, set())
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
@ -204,7 +214,9 @@ class TestSender(unittest.TestCase):
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
self.sender.failures = 1
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
@ -243,6 +255,94 @@ class TestSender(unittest.TestCase):
method_name, mock_method.mock_calls,
expected_calls))
def test_call_and_missing_check(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
job = {'device': 'dev', 'partition': '9'}
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
def yield_hashes(device, partition, policy_index, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy_index == 0:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
job = {'device': 'dev', 'partition': '9'}
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
self.assertEqual(candidates, set())
def test_connect_send_timeout(self):
self.replicator.conn_timeout = 0.01
node = dict(replication_ip='1.2.3.4', replication_port=5678,
@ -257,7 +357,9 @@ class TestSender(unittest.TestCase):
with mock.patch.object(
ssync_sender.bufferedhttp.BufferedHTTPConnection,
'putrequest', putrequest):
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
@ -279,7 +381,9 @@ class TestSender(unittest.TestCase):
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
FakeBufferedHTTPConnection):
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
@ -302,7 +406,9 @@ class TestSender(unittest.TestCase):
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
FakeBufferedHTTPConnection):
self.assertFalse(self.sender())
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
@ -389,6 +495,7 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
self.assertEqual(self.sender.available_set, set())
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
@ -431,6 +538,10 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
candidates = ['9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0def',
'9d41d8cd98f00b204e9800998ecf1def']
self.assertEqual(self.sender.available_set, set(candidates))
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
@ -462,6 +573,8 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_set,
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
@ -494,6 +607,8 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_set,
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
@ -525,6 +640,8 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_set,
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_send_list(self):
def yield_hashes(device, partition, policy_idx, suffixes=None):
@ -556,6 +673,8 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, ['0123abc'])
self.assertEqual(self.sender.available_set,
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_updates_timeout(self):
self.sender.connection = FakeConnection()