Merge "Fix object replicator partition cleanup"
This commit is contained in:
commit
23f55b2ebc
@ -242,7 +242,7 @@ class ObjectReplicator(Daemon):
|
||||
for node in job['nodes']:
|
||||
kwargs = {}
|
||||
if node['region'] in synced_remote_regions and \
|
||||
self.conf.get('sync_method') == 'ssync':
|
||||
self.conf.get('sync_method', 'rsync') == 'ssync':
|
||||
kwargs['remote_check_objs'] = \
|
||||
synced_remote_regions[node['region']]
|
||||
# cand_objs is a list of objects for deletion
|
||||
@ -273,11 +273,12 @@ class ObjectReplicator(Daemon):
|
||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||
all(responses)
|
||||
if delete_handoff:
|
||||
if delete_objs:
|
||||
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
|
||||
delete_objs is not None):
|
||||
self.logger.info(_("Removing %s objects"),
|
||||
len(delete_objs))
|
||||
self.delete_handoff_objs(job, delete_objs)
|
||||
elif self.conf.get('sync_method') == 'rsync':
|
||||
else:
|
||||
self.delete_partition(job['path'])
|
||||
elif not suffixes:
|
||||
self.delete_partition(job['path'])
|
||||
|
@ -117,14 +117,14 @@ def _mock_process(ret):
|
||||
object_replicator.subprocess.Popen = orig_process
|
||||
|
||||
|
||||
def _create_test_rings(path):
|
||||
def _create_test_rings(path, devs=None):
|
||||
testgz = os.path.join(path, 'object.ring.gz')
|
||||
intended_replica2part2dev_id = [
|
||||
[0, 1, 2, 3, 4, 5, 6],
|
||||
[1, 2, 3, 0, 5, 6, 4],
|
||||
[2, 3, 0, 1, 6, 4, 5],
|
||||
]
|
||||
intended_devs = [
|
||||
intended_devs = devs or [
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
|
||||
{'id': 1, 'device': 'sda', 'zone': 1,
|
||||
@ -153,6 +153,8 @@ def _create_test_rings(path):
|
||||
ring.RingData(intended_replica2part2dev_id,
|
||||
intended_devs, intended_part_shift),
|
||||
f)
|
||||
for policy in POLICIES:
|
||||
policy.object_ring = None # force reload
|
||||
return
|
||||
|
||||
|
||||
@ -418,6 +420,81 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_default_sync_method(self):
|
||||
self.replicator.conf.pop('sync_method')
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
ring = self.replicator.get_object_ring(0)
|
||||
nodes = [node for node in
|
||||
ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_ssync_single_region(self):
|
||||
devs = [
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '127.0.0.0', 'port': 6000},
|
||||
{'id': 1, 'device': 'sda', 'zone': 1,
|
||||
'region': 1, 'ip': '127.0.0.1', 'port': 6000},
|
||||
{'id': 2, 'device': 'sda', 'zone': 2,
|
||||
'region': 1, 'ip': '127.0.0.2', 'port': 6000},
|
||||
{'id': 3, 'device': 'sda', 'zone': 4,
|
||||
'region': 1, 'ip': '127.0.0.3', 'port': 6000},
|
||||
{'id': 4, 'device': 'sda', 'zone': 5,
|
||||
'region': 1, 'ip': '127.0.0.4', 'port': 6000},
|
||||
{'id': 5, 'device': 'sda', 'zone': 6,
|
||||
'region': 1, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000},
|
||||
{'id': 6, 'device': 'sda', 'zone': 7, 'region': 1,
|
||||
'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000},
|
||||
]
|
||||
_create_test_rings(self.testdir, devs=devs)
|
||||
self.conf['sync_method'] = 'ssync'
|
||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||
self.replicator.logger = debug_logger()
|
||||
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
def _fake_ssync(node, job, suffixes, **kwargs):
|
||||
return True, set([ohash])
|
||||
|
||||
self.replicator.sync_method = _fake_ssync
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(whole_path_from, os.F_OK))
|
||||
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_1(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
|
Loading…
Reference in New Issue
Block a user