Efficient Replication for Distributed Regions
This change provides a efficient way of replication between regions of a global distributed cluster. This approach makes object-replicator to push replicas to a primary node in a remote region, then, to skip pushing them to next primary node in the region with expecting asynchronous replication. This implementation includes a couple of changes on ssync_sender to allow object-replicator to delete local handoff objects correctly. One is to return a list of existing objects in remote region. The list includes local paths of the objects which exist both on the local device and the remote device. The other is supporting existence check for specified objects. It requires the object list build by the first change. When the object list is given, ssync_sender does only missing_check based on the list. These changes are needed because current swift can not handle the existence check in object-level. Note that this feature will work partially (i.e. only when primary-to-primary) with rsync. Implements: blueprint efficient-replication Change-Id: I5d990444d7977f4127bb37f9256212c893438df1
This commit is contained in:
parent
6e898aa3a7
commit
20ca279d74
@ -14,7 +14,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
from os.path import isdir, isfile, join
|
||||
import errno
|
||||
from os.path import isdir, isfile, join, dirname
|
||||
import random
|
||||
import shutil
|
||||
import time
|
||||
@ -31,7 +32,7 @@ from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import whataremyips, unlink_older_than, \
|
||||
compute_eta, get_logger, dump_recon_cache, ismount, \
|
||||
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
|
||||
tpool_reraise, config_auto_int_value
|
||||
tpool_reraise, config_auto_int_value, storage_directory
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
@ -95,7 +96,8 @@ class ObjectReplicator(Daemon):
|
||||
conf.get('handoff_delete', 'auto'), 0)
|
||||
self._diskfile_mgr = DiskFileManager(conf, self.logger)
|
||||
|
||||
def sync(self, node, job, suffixes): # Just exists for doc anchor point
|
||||
# Just exists for doc anchor point
|
||||
def sync(self, node, job, suffixes, *args, **kwargs):
|
||||
"""
|
||||
Synchronize local suffix directories from a partition with a remote
|
||||
node.
|
||||
@ -106,7 +108,7 @@ class ObjectReplicator(Daemon):
|
||||
|
||||
:returns: boolean indicating success or failure
|
||||
"""
|
||||
return self.sync_method(node, job, suffixes)
|
||||
return self.sync_method(node, job, suffixes, *args, **kwargs)
|
||||
|
||||
def get_object_ring(self, policy_idx):
|
||||
"""
|
||||
@ -168,7 +170,7 @@ class ObjectReplicator(Daemon):
|
||||
sync method in Swift.
|
||||
"""
|
||||
if not os.path.exists(job['path']):
|
||||
return False
|
||||
return False, set()
|
||||
args = [
|
||||
'rsync',
|
||||
'--recursive',
|
||||
@ -193,14 +195,15 @@ class ObjectReplicator(Daemon):
|
||||
args.append(spath)
|
||||
had_any = True
|
||||
if not had_any:
|
||||
return False
|
||||
return False, set()
|
||||
data_dir = get_data_dir(job['policy_idx'])
|
||||
args.append(join(rsync_module, node['device'],
|
||||
data_dir, job['partition']))
|
||||
return self._rsync(args) == 0
|
||||
return self._rsync(args) == 0, set()
|
||||
|
||||
def ssync(self, node, job, suffixes):
|
||||
return ssync_sender.Sender(self, node, job, suffixes)()
|
||||
def ssync(self, node, job, suffixes, remote_check_objs=None):
|
||||
return ssync_sender.Sender(
|
||||
self, node, job, suffixes, remote_check_objs)()
|
||||
|
||||
def check_ring(self, object_ring):
|
||||
"""
|
||||
@ -233,9 +236,18 @@ class ObjectReplicator(Daemon):
|
||||
try:
|
||||
responses = []
|
||||
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
|
||||
synced_remote_regions = {}
|
||||
delete_objs = None
|
||||
if suffixes:
|
||||
for node in job['nodes']:
|
||||
success = self.sync(node, job, suffixes)
|
||||
kwargs = {}
|
||||
if node['region'] in synced_remote_regions and \
|
||||
self.conf.get('sync_method') == 'ssync':
|
||||
kwargs['remote_check_objs'] = \
|
||||
synced_remote_regions[node['region']]
|
||||
# cand_objs is a list of objects for deletion
|
||||
success, cand_objs = self.sync(
|
||||
node, job, suffixes, **kwargs)
|
||||
if success:
|
||||
with Timeout(self.http_timeout):
|
||||
conn = http_connect(
|
||||
@ -244,7 +256,14 @@ class ObjectReplicator(Daemon):
|
||||
node['device'], job['partition'], 'REPLICATE',
|
||||
'/' + '-'.join(suffixes), headers=self.headers)
|
||||
conn.getresponse().read()
|
||||
if node['region'] != job['region'] and cand_objs:
|
||||
synced_remote_regions[node['region']] = cand_objs
|
||||
responses.append(success)
|
||||
for region, cand_objs in synced_remote_regions.iteritems():
|
||||
if delete_objs is None:
|
||||
delete_objs = cand_objs
|
||||
else:
|
||||
delete_objs = delete_objs.intersection(cand_objs)
|
||||
if self.handoff_delete:
|
||||
# delete handoff if we have had handoff_delete successes
|
||||
delete_handoff = len([resp for resp in responses if resp]) >= \
|
||||
@ -254,14 +273,37 @@ class ObjectReplicator(Daemon):
|
||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||
all(responses)
|
||||
if not suffixes or delete_handoff:
|
||||
self.logger.info(_("Removing partition: %s"), job['path'])
|
||||
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
|
||||
if delete_objs:
|
||||
self.logger.info(_("Removing %s objecs"),
|
||||
len(delete_objs))
|
||||
delete_objs = [
|
||||
storage_directory(job['obj_path'],
|
||||
job['partition'],
|
||||
object_hash)
|
||||
for object_hash in delete_objs]
|
||||
self.delete_handoff_paths(delete_objs)
|
||||
else:
|
||||
self.logger.info(_("Removing partition: %s"), job['path'])
|
||||
tpool.execute(
|
||||
shutil.rmtree, job['path'], ignore_errors=True)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(_("Error syncing handoff partition"))
|
||||
finally:
|
||||
self.partition_times.append(time.time() - begin)
|
||||
self.logger.timing_since('partition.delete.timing', begin)
|
||||
|
||||
def delete_handoff_paths(self, paths):
|
||||
for object_path in paths:
|
||||
tpool.execute(shutil.rmtree, object_path, ignore_errors=True)
|
||||
suffix_dir = dirname(object_path)
|
||||
try:
|
||||
os.rmdir(suffix_dir)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
|
||||
self.logger.exception(
|
||||
"Unexpected error trying to cleanup suffix dir:%r",
|
||||
suffix_dir)
|
||||
|
||||
def update(self, job):
|
||||
"""
|
||||
High-level method that replicates a single partition.
|
||||
@ -280,6 +322,8 @@ class ObjectReplicator(Daemon):
|
||||
self.suffix_hash += hashed
|
||||
self.logger.update_stats('suffix.hashes', hashed)
|
||||
attempts_left = len(job['nodes'])
|
||||
synced_remote_regions = set()
|
||||
random.shuffle(job['nodes'])
|
||||
nodes = itertools.chain(
|
||||
job['nodes'],
|
||||
job['object_ring'].get_more_nodes(int(job['partition'])))
|
||||
@ -287,6 +331,10 @@ class ObjectReplicator(Daemon):
|
||||
# If this throws StopIteration it will be caught way below
|
||||
node = next(nodes)
|
||||
attempts_left -= 1
|
||||
# if we have already synced to this remote region,
|
||||
# don't sync again on this replication pass
|
||||
if node['region'] in synced_remote_regions:
|
||||
continue
|
||||
try:
|
||||
with Timeout(self.http_timeout):
|
||||
resp = http_connect(
|
||||
@ -320,7 +368,7 @@ class ObjectReplicator(Daemon):
|
||||
suffixes = [suffix for suffix in local_hash if
|
||||
local_hash[suffix] !=
|
||||
remote_hash.get(suffix, -1)]
|
||||
self.sync(node, job, suffixes)
|
||||
success, _junk = self.sync(node, job, suffixes)
|
||||
with Timeout(self.http_timeout):
|
||||
conn = http_connect(
|
||||
node['replication_ip'], node['replication_port'],
|
||||
@ -328,6 +376,9 @@ class ObjectReplicator(Daemon):
|
||||
'/' + '-'.join(suffixes),
|
||||
headers=self.headers)
|
||||
conn.getresponse().read()
|
||||
# add only remote region when replicate succeeded
|
||||
if success and node['region'] != job['region']:
|
||||
synced_remote_regions.add(node['region'])
|
||||
self.suffix_sync += len(suffixes)
|
||||
self.logger.update_stats('suffix.syncs', len(suffixes))
|
||||
except (Exception, Timeout):
|
||||
@ -450,11 +501,13 @@ class ObjectReplicator(Daemon):
|
||||
jobs.append(
|
||||
dict(path=job_path,
|
||||
device=local_dev['device'],
|
||||
obj_path=obj_path,
|
||||
nodes=nodes,
|
||||
delete=len(nodes) > len(part_nodes) - 1,
|
||||
policy_idx=policy.idx,
|
||||
partition=partition,
|
||||
object_ring=obj_ring))
|
||||
object_ring=obj_ring,
|
||||
region=local_dev['region']))
|
||||
except ValueError:
|
||||
continue
|
||||
return jobs
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import urllib
|
||||
from itertools import ifilter
|
||||
from swift.common import bufferedhttp
|
||||
from swift.common import exceptions
|
||||
from swift.common import http
|
||||
@ -28,7 +29,7 @@ class Sender(object):
|
||||
process is there.
|
||||
"""
|
||||
|
||||
def __init__(self, daemon, node, job, suffixes):
|
||||
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
|
||||
self.daemon = daemon
|
||||
self.node = node
|
||||
self.job = job
|
||||
@ -37,7 +38,11 @@ class Sender(object):
|
||||
self.response = None
|
||||
self.response_buffer = ''
|
||||
self.response_chunk_left = 0
|
||||
self.send_list = None
|
||||
self.available_set = set()
|
||||
# When remote_check_objs is given in job, ssync_sender trys only to
|
||||
# make sure those objects exist or not in remote.
|
||||
self.remote_check_objs = remote_check_objs
|
||||
self.send_list = []
|
||||
self.failures = 0
|
||||
|
||||
@property
|
||||
@ -45,8 +50,16 @@ class Sender(object):
|
||||
return int(self.job.get('policy_idx', 0))
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
Perform ssync with remote node.
|
||||
|
||||
:returns: a 2-tuple, in the form (success, can_delete_objs).
|
||||
|
||||
Success is a boolean, and can_delete_objs is an iterable of strings
|
||||
representing the hashes which are in sync with the remote node.
|
||||
"""
|
||||
if not self.suffixes:
|
||||
return True
|
||||
return True, set()
|
||||
try:
|
||||
# Double try blocks in case our main error handler fails.
|
||||
try:
|
||||
@ -57,9 +70,20 @@ class Sender(object):
|
||||
# other exceptions will be logged with a full stack trace.
|
||||
self.connect()
|
||||
self.missing_check()
|
||||
self.updates()
|
||||
if not self.remote_check_objs:
|
||||
self.updates()
|
||||
can_delete_obj = self.available_set
|
||||
else:
|
||||
# when we are initialized with remote_check_objs we don't
|
||||
# *send* any requested updates; instead we only collect
|
||||
# what's already in sync and safe for deletion
|
||||
can_delete_obj = self.available_set.difference(
|
||||
self.send_list)
|
||||
self.disconnect()
|
||||
return self.failures == 0
|
||||
if not self.failures:
|
||||
return True, can_delete_obj
|
||||
else:
|
||||
return False, set()
|
||||
except (exceptions.MessageTimeout,
|
||||
exceptions.ReplicationException) as err:
|
||||
self.daemon.logger.error(
|
||||
@ -85,7 +109,7 @@ class Sender(object):
|
||||
# would only get called if the above except Exception handler
|
||||
# failed (bad node or job data).
|
||||
self.daemon.logger.exception('EXCEPTION in replication.Sender')
|
||||
return False
|
||||
return False, set()
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
@ -96,7 +120,7 @@ class Sender(object):
|
||||
self.daemon.conn_timeout, 'connect send'):
|
||||
self.connection = bufferedhttp.BufferedHTTPConnection(
|
||||
'%s:%s' % (self.node['replication_ip'],
|
||||
self.node['replication_port']))
|
||||
self.node['replication_port']))
|
||||
self.connection.putrequest('REPLICATION', '/%s/%s' % (
|
||||
self.node['device'], self.job['partition']))
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
@ -169,10 +193,14 @@ class Sender(object):
|
||||
self.daemon.node_timeout, 'missing_check start'):
|
||||
msg = ':MISSING_CHECK: START\r\n'
|
||||
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
||||
for path, object_hash, timestamp in \
|
||||
self.daemon._diskfile_mgr.yield_hashes(
|
||||
self.job['device'], self.job['partition'],
|
||||
self.policy_idx, self.suffixes):
|
||||
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
||||
self.job['device'], self.job['partition'],
|
||||
self.policy_idx, self.suffixes)
|
||||
if self.remote_check_objs:
|
||||
hash_gen = ifilter(lambda (path, object_hash, timestamp):
|
||||
object_hash in self.remote_check_objs, hash_gen)
|
||||
for path, object_hash, timestamp in hash_gen:
|
||||
self.available_set.add(object_hash)
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout,
|
||||
'missing_check send line'):
|
||||
@ -197,7 +225,6 @@ class Sender(object):
|
||||
elif line:
|
||||
raise exceptions.ReplicationException(
|
||||
'Unexpected response: %r' % line[:1024])
|
||||
self.send_list = []
|
||||
while True:
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.http_timeout, 'missing_check line wait'):
|
||||
|
@ -22,13 +22,15 @@ import cPickle as pickle
|
||||
import time
|
||||
import tempfile
|
||||
from contextlib import contextmanager, closing
|
||||
from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import Timeout, tpool
|
||||
|
||||
from test.unit import FakeLogger, patch_policies
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
from swift.common import ring
|
||||
from swift.obj import diskfile, replicator as object_replicator
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
@ -84,9 +86,20 @@ class MockProcess(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
targs = MockProcess.check_args.next()
|
||||
for targ in targs:
|
||||
if targ not in args[0]:
|
||||
process_errors.append("Invalid: %s not in %s" % (targ,
|
||||
args))
|
||||
# Allow more than 2 candidate targs
|
||||
# (e.g. a case that either node is fine when nodes shuffled)
|
||||
if isinstance(targ, tuple):
|
||||
allowed = False
|
||||
for target in targ:
|
||||
if target in args[0]:
|
||||
allowed = True
|
||||
if not allowed:
|
||||
process_errors.append("Invalid: %s not in %s" % (targ,
|
||||
args))
|
||||
else:
|
||||
if targ not in args[0]:
|
||||
process_errors.append("Invalid: %s not in %s" % (targ,
|
||||
args))
|
||||
self.stdout = self.Stream()
|
||||
|
||||
def wait(self):
|
||||
@ -112,14 +125,19 @@ def _create_test_rings(path):
|
||||
[2, 3, 0, 1, 6, 4, 5],
|
||||
]
|
||||
intended_devs = [
|
||||
{'id': 0, 'device': 'sda', 'zone': 0, 'ip': '127.0.0.0', 'port': 6000},
|
||||
{'id': 1, 'device': 'sda', 'zone': 1, 'ip': '127.0.0.1', 'port': 6000},
|
||||
{'id': 2, 'device': 'sda', 'zone': 2, 'ip': '127.0.0.2', 'port': 6000},
|
||||
{'id': 3, 'device': 'sda', 'zone': 4, 'ip': '127.0.0.3', 'port': 6000},
|
||||
{'id': 4, 'device': 'sda', 'zone': 5, 'ip': '127.0.0.4', 'port': 6000},
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
|
||||
{'id': 1, 'device': 'sda', 'zone': 1,
|
||||
'region': 2, 'ip': '127.0.0.1', 'port': 6000},
|
||||
{'id': 2, 'device': 'sda', 'zone': 2,
|
||||
'region': 1, 'ip': '127.0.0.2', 'port': 6000},
|
||||
{'id': 3, 'device': 'sda', 'zone': 4,
|
||||
'region': 2, 'ip': '127.0.0.3', 'port': 6000},
|
||||
{'id': 4, 'device': 'sda', 'zone': 5,
|
||||
'region': 1, 'ip': '127.0.0.4', 'port': 6000},
|
||||
{'id': 5, 'device': 'sda', 'zone': 6,
|
||||
'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
|
||||
{'id': 6, 'device': 'sda', 'zone': 7,
|
||||
'region': 2, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
|
||||
{'id': 6, 'device': 'sda', 'zone': 7, 'region': 1,
|
||||
'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000},
|
||||
]
|
||||
intended_part_shift = 30
|
||||
@ -200,10 +218,11 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
nodes = [node for node in
|
||||
ring.get_part_nodes(int(cur_part))
|
||||
if node['ip'] not in _ips()]
|
||||
rsync_mods = tuple(['%s::object/sda/objects/%s' %
|
||||
(node['ip'], cur_part) for node in nodes])
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
self.assertFalse(process_errors)
|
||||
@ -233,10 +252,11 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
nodes = [node for node in
|
||||
ring.get_part_nodes(int(cur_part))
|
||||
if node['ip'] not in _ips()]
|
||||
rsync_mods = tuple(['%s::object/sda/objects-1/%s' %
|
||||
(node['ip'], cur_part) for node in nodes])
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects-1/%s' % (node['ip'], cur_part)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
self.assertFalse(process_errors)
|
||||
@ -530,6 +550,40 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
ring = self.replicator.get_object_ring(0)
|
||||
nodes = [node for node in
|
||||
ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
if node['region'] != 1:
|
||||
# the rsync calls for other region to fail
|
||||
ret_code = 1
|
||||
else:
|
||||
ret_code = 0
|
||||
process_arg_checker.append(
|
||||
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_override_params(self):
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
@ -564,6 +618,190 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertFalse(os.access(pol1_part_path, os.F_OK))
|
||||
self.assertTrue(os.access(pol0_part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_ssync(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('0')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
self.call_nums = 0
|
||||
self.conf['sync_method'] = 'ssync'
|
||||
|
||||
def _fake_ssync(node, job, suffixes, **kwargs):
|
||||
success = True
|
||||
ret_val = [whole_path_from]
|
||||
if self.call_nums == 2:
|
||||
# ssync should return (True, []) only when the second
|
||||
# candidate node has not get the replica yet.
|
||||
success = False
|
||||
ret_val = []
|
||||
self.call_nums += 1
|
||||
return success, set(ret_val)
|
||||
|
||||
self.replicator.sync_method = _fake_ssync
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
# The file should be deleted at the second replicate call
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
# The partition should be deleted at the third replicate call
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
del self.call_nums
|
||||
|
||||
def test_delete_partition_ssync_with_sync_failure(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('0')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.call_nums = 0
|
||||
self.conf['sync_method'] = 'ssync'
|
||||
|
||||
def _fake_ssync(node, job, suffixes):
|
||||
success = False
|
||||
ret_val = []
|
||||
if self.call_nums == 2:
|
||||
# ssync should return (True, []) only when the second
|
||||
# candidate node has not get the replica yet.
|
||||
success = True
|
||||
ret_val = [whole_path_from]
|
||||
self.call_nums += 1
|
||||
return success, set(ret_val)
|
||||
|
||||
self.replicator.sync_method = _fake_ssync
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
del self.call_nums
|
||||
|
||||
def test_delete_partition_ssync_with_cleanup_failure(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
self.replicator.logger = mock_logger = mock.MagicMock()
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('0')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
self.call_nums = 0
|
||||
self.conf['sync_method'] = 'ssync'
|
||||
|
||||
def _fake_ssync(node, job, suffixes, **kwargs):
|
||||
success = True
|
||||
ret_val = [whole_path_from]
|
||||
if self.call_nums == 2:
|
||||
# ssync should return (True, []) only when the second
|
||||
# candidate node has not get the replica yet.
|
||||
success = False
|
||||
ret_val = []
|
||||
self.call_nums += 1
|
||||
return success, set(ret_val)
|
||||
|
||||
rmdir_func = os.rmdir
|
||||
|
||||
def raise_exception_rmdir(exception_class, error_no):
|
||||
instance = exception_class()
|
||||
instance.errno = error_no
|
||||
|
||||
def func(directory):
|
||||
if directory == suffix_dir_path:
|
||||
raise instance
|
||||
else:
|
||||
rmdir_func(directory)
|
||||
|
||||
return func
|
||||
|
||||
self.replicator.sync_method = _fake_ssync
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
# Fail with ENOENT
|
||||
with mock.patch('os.rmdir',
|
||||
raise_exception_rmdir(OSError, ENOENT)):
|
||||
self.replicator.replicate()
|
||||
self.assertEquals(mock_logger.exception.call_count, 0)
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
# Fail with ENOTEMPTY
|
||||
with mock.patch('os.rmdir',
|
||||
raise_exception_rmdir(OSError, ENOTEMPTY)):
|
||||
self.replicator.replicate()
|
||||
self.assertEquals(mock_logger.exception.call_count, 0)
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
# Fail with ENOTDIR
|
||||
with mock.patch('os.rmdir',
|
||||
raise_exception_rmdir(OSError, ENOTDIR)):
|
||||
self.replicator.replicate()
|
||||
self.assertEquals(mock_logger.exception.call_count, 1)
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
# Finally we can cleanup everything
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_run_once_recover_from_failure(self):
|
||||
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
@ -781,7 +1019,8 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a'
|
||||
'ada0f4eee69494ff'})
|
||||
set_default(self)
|
||||
self.replicator.sync = fake_func = mock.MagicMock()
|
||||
self.replicator.sync = fake_func = \
|
||||
mock.MagicMock(return_value=(True, []))
|
||||
self.replicator.update(local_job)
|
||||
reqs = []
|
||||
for node in local_job['nodes']:
|
||||
@ -792,6 +1031,26 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertEquals(self.replicator.suffix_sync, 2)
|
||||
self.assertEquals(self.replicator.suffix_hash, 1)
|
||||
self.assertEquals(self.replicator.suffix_count, 1)
|
||||
|
||||
# Efficient Replication Case
|
||||
set_default(self)
|
||||
self.replicator.sync = fake_func = \
|
||||
mock.MagicMock(return_value=(True, []))
|
||||
all_jobs = self.replicator.collect_jobs()
|
||||
job = None
|
||||
for tmp in all_jobs:
|
||||
if tmp['partition'] == '3':
|
||||
job = tmp
|
||||
break
|
||||
# The candidate nodes to replicate (i.e. dev1 and dev3)
|
||||
# belong to another region
|
||||
self.replicator.update(job)
|
||||
self.assertEquals(fake_func.call_count, 1)
|
||||
self.assertEquals(self.replicator.replication_count, 1)
|
||||
self.assertEquals(self.replicator.suffix_sync, 1)
|
||||
self.assertEquals(self.replicator.suffix_hash, 1)
|
||||
self.assertEquals(self.replicator.suffix_count, 1)
|
||||
|
||||
mock_http.reset_mock()
|
||||
mock_logger.reset_mock()
|
||||
|
||||
|
@ -137,7 +137,9 @@ class TestSender(unittest.TestCase):
|
||||
job = dict(partition='9')
|
||||
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.error.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
|
||||
@ -154,7 +156,9 @@ class TestSender(unittest.TestCase):
|
||||
job = dict(partition='9')
|
||||
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.error.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
|
||||
@ -167,7 +171,9 @@ class TestSender(unittest.TestCase):
|
||||
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.connect = 'cause exception'
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.exception.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1],
|
||||
@ -181,7 +187,9 @@ class TestSender(unittest.TestCase):
|
||||
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.connect = 'cause exception'
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
self.replicator.logger.exception.assert_called_once_with(
|
||||
'EXCEPTION in replication.Sender')
|
||||
|
||||
@ -191,7 +199,9 @@ class TestSender(unittest.TestCase):
|
||||
self.sender.missing_check = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
self.assertTrue(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertTrue(success)
|
||||
self.assertEquals(candidates, set())
|
||||
self.sender.connect.assert_called_once_with()
|
||||
self.sender.missing_check.assert_called_once_with()
|
||||
self.sender.updates.assert_called_once_with()
|
||||
@ -204,7 +214,9 @@ class TestSender(unittest.TestCase):
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
self.sender.failures = 1
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
self.sender.connect.assert_called_once_with()
|
||||
self.sender.missing_check.assert_called_once_with()
|
||||
self.sender.updates.assert_called_once_with()
|
||||
@ -243,6 +255,94 @@ class TestSender(unittest.TestCase):
|
||||
method_name, mock_method.mock_calls,
|
||||
expected_calls))
|
||||
|
||||
def test_call_and_missing_check(self):
|
||||
def yield_hashes(device, partition, policy_index, suffixes=None):
|
||||
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
|
||||
and policy_index == 0:
|
||||
yield (
|
||||
'/srv/node/dev/objects/9/abc/'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'1380144470.00000')
|
||||
else:
|
||||
raise Exception(
|
||||
'No match for %r %r %r' % (device, partition, suffixes))
|
||||
|
||||
self.sender.connection = FakeConnection()
|
||||
self.sender.job = {'device': 'dev', 'partition': '9'}
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.response = FakeResponse(
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
success, candidates = self.sender()
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
self.assertEqual(self.sender.failures, 0)
|
||||
|
||||
def test_call_and_missing_check_with_obj_list(self):
|
||||
def yield_hashes(device, partition, policy_index, suffixes=None):
|
||||
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
|
||||
and policy_index == 0:
|
||||
yield (
|
||||
'/srv/node/dev/objects/9/abc/'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'1380144470.00000')
|
||||
else:
|
||||
raise Exception(
|
||||
'No match for %r %r %r' % (device, partition, suffixes))
|
||||
job = {'device': 'dev', 'partition': '9'}
|
||||
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
|
||||
['9d41d8cd98f00b204e9800998ecf0abc'])
|
||||
self.sender.connection = FakeConnection()
|
||||
self.sender.response = FakeResponse(
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
success, candidates = self.sender()
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
self.assertEqual(self.sender.failures, 0)
|
||||
|
||||
def test_call_and_missing_check_with_obj_list_but_required(self):
|
||||
def yield_hashes(device, partition, policy_index, suffixes=None):
|
||||
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
|
||||
and policy_index == 0:
|
||||
yield (
|
||||
'/srv/node/dev/objects/9/abc/'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'1380144470.00000')
|
||||
else:
|
||||
raise Exception(
|
||||
'No match for %r %r %r' % (device, partition, suffixes))
|
||||
job = {'device': 'dev', 'partition': '9'}
|
||||
self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
|
||||
['9d41d8cd98f00b204e9800998ecf0abc'])
|
||||
self.sender.connection = FakeConnection()
|
||||
self.sender.response = FakeResponse(
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
success, candidates = self.sender()
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(candidates, set())
|
||||
|
||||
def test_connect_send_timeout(self):
|
||||
self.replicator.conn_timeout = 0.01
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
@ -257,7 +357,9 @@ class TestSender(unittest.TestCase):
|
||||
with mock.patch.object(
|
||||
ssync_sender.bufferedhttp.BufferedHTTPConnection,
|
||||
'putrequest', putrequest):
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.error.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
|
||||
@ -279,7 +381,9 @@ class TestSender(unittest.TestCase):
|
||||
with mock.patch.object(
|
||||
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
|
||||
FakeBufferedHTTPConnection):
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.error.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
|
||||
@ -302,7 +406,9 @@ class TestSender(unittest.TestCase):
|
||||
with mock.patch.object(
|
||||
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
|
||||
FakeBufferedHTTPConnection):
|
||||
self.assertFalse(self.sender())
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, set())
|
||||
call = self.replicator.logger.error.mock_calls[0]
|
||||
self.assertEqual(
|
||||
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
|
||||
@ -389,6 +495,7 @@ class TestSender(unittest.TestCase):
|
||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.send_list, [])
|
||||
self.assertEqual(self.sender.available_set, set())
|
||||
|
||||
def test_missing_check_has_suffixes(self):
|
||||
def yield_hashes(device, partition, policy_idx, suffixes=None):
|
||||
@ -431,6 +538,10 @@ class TestSender(unittest.TestCase):
|
||||
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.send_list, [])
|
||||
candidates = ['9d41d8cd98f00b204e9800998ecf0abc',
|
||||
'9d41d8cd98f00b204e9800998ecf0def',
|
||||
'9d41d8cd98f00b204e9800998ecf1def']
|
||||
self.assertEqual(self.sender.available_set, set(candidates))
|
||||
|
||||
def test_missing_check_far_end_disconnect(self):
|
||||
def yield_hashes(device, partition, policy_idx, suffixes=None):
|
||||
@ -462,6 +573,8 @@ class TestSender(unittest.TestCase):
|
||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.available_set,
|
||||
set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
|
||||
def test_missing_check_far_end_disconnect2(self):
|
||||
def yield_hashes(device, partition, policy_idx, suffixes=None):
|
||||
@ -494,6 +607,8 @@ class TestSender(unittest.TestCase):
|
||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.available_set,
|
||||
set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
|
||||
def test_missing_check_far_end_unexpected(self):
|
||||
def yield_hashes(device, partition, policy_idx, suffixes=None):
|
||||
@ -525,6 +640,8 @@ class TestSender(unittest.TestCase):
|
||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.available_set,
|
||||
set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
|
||||
def test_missing_check_send_list(self):
|
||||
def yield_hashes(device, partition, policy_idx, suffixes=None):
|
||||
@ -556,6 +673,8 @@ class TestSender(unittest.TestCase):
|
||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||
self.assertEqual(self.sender.send_list, ['0123abc'])
|
||||
self.assertEqual(self.sender.available_set,
|
||||
set(['9d41d8cd98f00b204e9800998ecf0abc']))
|
||||
|
||||
def test_updates_timeout(self):
|
||||
self.sender.connection = FakeConnection()
|
||||
|
Loading…
x
Reference in New Issue
Block a user