From 9c44bb224fb9f61d6ed4bb8d9d035bf78bd9abb1 Mon Sep 17 00:00:00 2001 From: David Goetz Date: Fri, 29 Oct 2010 15:26:35 -0700 Subject: [PATCH 1/5] adding run_once unit test --- swift/obj/replicator.py | 2 +- test/unit/obj/test_replicator.py | 87 +++++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 963f02e375..b2b7303c95 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -232,7 +232,7 @@ class ObjectReplicator(Daemon): """ Execute the rsync binary to replicate a partition. - :returns: a tuple of (rsync exit code, rsync standard output) + :returns: return code of rsync process. 0 is successful """ start_time = time.time() ret_val = None diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 06f7d74582..55e9d17e4c 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -22,12 +22,14 @@ from shutil import rmtree import cPickle as pickle import logging import fcntl +import time from contextlib import contextmanager from eventlet.green import subprocess - -from swift.obj import replicator as object_replicator +from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring +from swift.obj import replicator as object_replicator +from swift.obj.server import DiskFile def _ips(): return ['127.0.0.0',] @@ -39,15 +41,53 @@ class NullHandler(logging.Handler): null_logger = logging.getLogger("testing") null_logger.addHandler(NullHandler()) + +def mock_http_connect(status): + + class FakeConn(object): + + def __init__(self, status, *args, **kwargs): + self.status = status + self.reason = 'Fake' + self.host = args[0] + self.port = args[1] + self.method = args[4] + self.path = args[5] + self.with_exc = False + self.headers = kwargs.get('headers',{}) + + def getresponse(self): + if self.with_exc: + raise Exception('test') + return self + + def getheader(self, header): + return self.headers[header] + + def read(self, amt=None): + return pickle.dumps({}) + + def close(self): + return + return lambda *args, **kwargs: FakeConn(status, *args, **kwargs) + +process_errors = [] + class MockProcess(object): ret_code = None ret_log = None + check_args = None class Stream(object): def read(self): return MockProcess.ret_log.next() def __init__(self, *args, **kwargs): + targs = MockProcess.check_args.next() + for targ in targs: + if targ not in args[0]: + process_errors.append("Invalid: %s not in %s" % (targ, + args)) self.stdout = self.Stream() def wait(self): @@ -58,6 +98,7 @@ def _mock_process(ret): orig_process = subprocess.Popen MockProcess.ret_code = (i[0] for i in ret) MockProcess.ret_log = (i[1] for i in ret) + MockProcess.check_args = (i[2] for i in ret) object_replicator.subprocess.Popen = MockProcess yield object_replicator.subprocess.Popen = orig_process @@ -98,7 +139,9 @@ class TestObjectReplicator(unittest.TestCase): os.mkdir(os.path.join(self.devices, 'sda')) self.objects = os.path.join(self.devices, 'sda', 'objects') os.mkdir(self.objects) + self.parts = {} for part in ['0','1','2', '3']: + self.parts[part] = os.path.join(self.objects, part) os.mkdir(os.path.join(self.objects, part)) self.ring = _create_test_ring(self.testdir) self.conf = dict( @@ -107,6 +150,46 @@ class TestObjectReplicator(unittest.TestCase): self.replicator = object_replicator.ObjectReplicator( self.conf) + def tearDown(self): + process_errors = [] + rmtree(self.testdir, ignore_errors=1) + + def test_run_once(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + object_replicator.http_connect = mock_http_connect(200) + + cur_part = '0' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) + process_arg_checker.append((0, '', + ['rsync', whole_path_from, rsync_mod])) + + with _mock_process(process_arg_checker): + replicator.run_once() + + self.assertFalse(process_errors) + + # def test_check_ring(self): # self.replicator.collect_jobs('sda', 0, self.ring) # self.assertTrue(self.replicator.check_ring()) From 2cb61902a316f5e7a034d27015c790ad2c9b04ac Mon Sep 17 00:00:00 2001 From: David Goetz Date: Wed, 3 Nov 2010 16:08:13 -0700 Subject: [PATCH 2/5] adding extra tests and suffix hash bug fix --- swift/obj/replicator.py | 2 +- test/unit/obj/test_replicator.py | 78 ++++++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index b2b7303c95..0ff90c92e1 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -61,7 +61,7 @@ def hash_suffix(path, reclaim_age): elif files: files.sort(reverse=True) meta = data = tomb = None - for filename in files: + for filename in files[:]: if not meta and filename.endswith('.meta'): meta = filename if not data and filename.endswith('.data'): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 55e9d17e4c..a5e8cb3fe2 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -24,6 +24,7 @@ import logging import fcntl import time from contextlib import contextmanager +from eventlet import tpool from eventlet.green import subprocess from swift.common.utils import hash_path, mkdirs, normalize_timestamp @@ -158,37 +159,96 @@ class TestObjectReplicator(unittest.TestCase): replicator = object_replicator.ObjectReplicator( dict(swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect object_replicator.http_connect = mock_http_connect(200) - cur_part = '0' df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') - mkdirs(df.datadir) f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + '.data'), 'wb') f.write('1234567890') f.close() - ohash = hash_path('a', 'c', 'o') data_dir = ohash[-3:] whole_path_from = os.path.join(self.objects, cur_part, data_dir) process_arg_checker = [] - nodes = [node for node in self.ring.get_part_nodes(int(cur_part)) \ if node['ip'] not in _ips()] - for node in nodes: rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) - process_arg_checker.append((0, '', - ['rsync', whole_path_from, rsync_mod])) - + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) with _mock_process(process_arg_checker): replicator.run_once() - self.assertFalse(process_errors) + object_replicator.http_connect = was_connector + + def test_hash_suffix_one_file(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time() - 100) + '.ts'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + object_replicator.hash_suffix(whole_path_from, 101) + self.assertEquals(len(os.listdir(self.parts['0'])), 1) + + object_replicator.hash_suffix(whole_path_from, 99) + self.assertEquals(len(os.listdir(self.parts['0'])), 0) + + def test_hash_suffix_multi_file_one(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + for tdiff in [1,50,100,500]: + for suff in ['.meta','.data','.ts']: + f = open(os.path.join(df.datadir, + normalize_timestamp(int(time.time()) - tdiff) + suff), + 'wb') + f.write('1234567890') + f.close() + + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hsh_path = os.listdir(whole_path_from)[0] + whole_hsh_path = os.path.join(whole_path_from, hsh_path) + + object_replicator.hash_suffix(whole_path_from, 99) + # only the tombstone should be left + self.assertEquals(len(os.listdir(whole_hsh_path)), 1) + + + def test_hash_suffix_multi_file_two(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + for tdiff in [1,50,100,500]: + suffs = ['.meta','.data'] + if tdiff > 50: + suffs.append('.ts') + for suff in suffs: + f = open(os.path.join(df.datadir, + normalize_timestamp(int(time.time()) - tdiff) + suff), + 'wb') + f.write('1234567890') + f.close() + + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hsh_path = os.listdir(whole_path_from)[0] + whole_hsh_path = os.path.join(whole_path_from, hsh_path) + + object_replicator.hash_suffix(whole_path_from, 99) + # only the meta and data should be left + self.assertEquals(len(os.listdir(whole_hsh_path)), 2) + # def test_check_ring(self): # self.replicator.collect_jobs('sda', 0, self.ring) From a71164995aaafd626cc16ba59cd01dbb69e33fe0 Mon Sep 17 00:00:00 2001 From: David Goetz Date: Fri, 5 Nov 2010 09:15:31 -0700 Subject: [PATCH 3/5] adding tests and splitting out collect jobs --- swift/common/utils.py | 2 +- swift/obj/replicator.py | 59 +++++---- test/unit/obj/test_replicator.py | 219 +++++++++++++++++++------------ 3 files changed, 167 insertions(+), 113 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 5a8bf0e1be..9df54f64db 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -29,7 +29,6 @@ from urllib import quote from contextlib import contextmanager import ctypes import ctypes.util -import fcntl import struct from ConfigParser import ConfigParser, NoSectionError, NoOptionError from tempfile import mkstemp @@ -622,6 +621,7 @@ def write_pickle(obj, dest, tmp): os.fsync(fd) renamer(tmppath, dest) + def audit_location_generator(devices, datadir, mount_check=True, logger=None): ''' Given a devices path and a data directory, yield (path, device, diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 0ff90c92e1..0eec1920c8 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -470,6 +470,36 @@ class ObjectReplicator(Daemon): self.kill_coros() self.last_replication_count = self.replication_count + def collect_jobs(self): + jobs = [] + ips = whataremyips() + for local_dev in [dev for dev in self.object_ring.devs + if dev and dev['ip'] in ips and dev['port'] == self.port]: + dev_path = join(self.devices_dir, local_dev['device']) + obj_path = join(dev_path, 'objects') + tmp_path = join(dev_path, 'tmp') + if self.mount_check and not os.path.ismount(dev_path): + self.logger.warn('%s is not mounted' % local_dev['device']) + continue + unlink_older_than(tmp_path, time.time() - self.reclaim_age) + if not os.path.exists(obj_path): + continue + for partition in os.listdir(obj_path): + try: + nodes = [node for node in + self.object_ring.get_part_nodes(int(partition)) + if node['id'] != local_dev['id']] + jobs.append(dict(path=join(obj_path, partition), + nodes=nodes, delete=len(nodes) > 2, + partition=partition)) + except ValueError: + continue + random.shuffle(jobs) + # Partititons that need to be deleted take priority + jobs.sort(key=lambda job: not job['delete']) + self.job_count = len(jobs) + return jobs + def replicate(self): """Run a replication pass""" self.start = time.time() @@ -479,38 +509,11 @@ class ObjectReplicator(Daemon): self.replication_count = 0 self.last_replication_count = -1 self.partition_times = [] - jobs = [] stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) try: - ips = whataremyips() self.run_pool = GreenPool(size=self.concurrency) - for local_dev in [ - dev for dev in self.object_ring.devs - if dev and dev['ip'] in ips and dev['port'] == self.port]: - dev_path = join(self.devices_dir, local_dev['device']) - obj_path = join(dev_path, 'objects') - tmp_path = join(dev_path, 'tmp') - if self.mount_check and not os.path.ismount(dev_path): - self.logger.warn('%s is not mounted' % local_dev['device']) - continue - unlink_older_than(tmp_path, time.time() - self.reclaim_age) - if not os.path.exists(obj_path): - continue - for partition in os.listdir(obj_path): - try: - nodes = [node for node in - self.object_ring.get_part_nodes(int(partition)) - if node['id'] != local_dev['id']] - jobs.append(dict(path=join(obj_path, partition), - nodes=nodes, delete=len(nodes) > 2, - partition=partition)) - except ValueError: - continue - random.shuffle(jobs) - # Partititons that need to be deleted take priority - jobs.sort(key=lambda job: not job['delete']) - self.job_count = len(jobs) + jobs = self.collect_jobs() for job in jobs: if not self.check_ring(): self.logger.info( diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index a5e8cb3fe2..58b804460b 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -32,11 +32,14 @@ from swift.common import ring from swift.obj import replicator as object_replicator from swift.obj.server import DiskFile + def _ips(): - return ['127.0.0.0',] + return ['127.0.0.0'] object_replicator.whataremyips = _ips + class NullHandler(logging.Handler): + def emit(self, record): pass null_logger = logging.getLogger("testing") @@ -55,7 +58,7 @@ def mock_http_connect(status): self.method = args[4] self.path = args[5] self.with_exc = False - self.headers = kwargs.get('headers',{}) + self.headers = kwargs.get('headers', {}) def getresponse(self): if self.with_exc: @@ -74,12 +77,14 @@ def mock_http_connect(status): process_errors = [] + class MockProcess(object): ret_code = None ret_log = None check_args = None class Stream(object): + def read(self): return MockProcess.ret_log.next() @@ -94,6 +99,7 @@ class MockProcess(object): def wait(self): return self.ret_code.next() + @contextmanager def _mock_process(ret): orig_process = subprocess.Popen @@ -104,6 +110,7 @@ def _mock_process(ret): yield object_replicator.subprocess.Popen = orig_process + def _create_test_ring(path): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ @@ -141,7 +148,7 @@ class TestObjectReplicator(unittest.TestCase): self.objects = os.path.join(self.devices, 'sda', 'objects') os.mkdir(self.objects) self.parts = {} - for part in ['0','1','2', '3']: + for part in ['0', '1', '2', '3']: self.parts[part] = os.path.join(self.objects, part) os.mkdir(os.path.join(self.objects, part)) self.ring = _create_test_ring(self.testdir) @@ -206,8 +213,8 @@ class TestObjectReplicator(unittest.TestCase): def test_hash_suffix_multi_file_one(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') mkdirs(df.datadir) - for tdiff in [1,50,100,500]: - for suff in ['.meta','.data','.ts']: + for tdiff in [1, 50, 100, 500]: + for suff in ['.meta', '.data', '.ts']: f = open(os.path.join(df.datadir, normalize_timestamp(int(time.time()) - tdiff) + suff), 'wb') @@ -224,12 +231,11 @@ class TestObjectReplicator(unittest.TestCase): # only the tombstone should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 1) - def test_hash_suffix_multi_file_two(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') mkdirs(df.datadir) - for tdiff in [1,50,100,500]: - suffs = ['.meta','.data'] + for tdiff in [1, 50, 100, 500]: + suffs = ['.meta', '.data'] if tdiff > 50: suffs.append('.ts') for suff in suffs: @@ -249,88 +255,133 @@ class TestObjectReplicator(unittest.TestCase): # only the meta and data should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 2) + def test_invalidate_hash(self): -# def test_check_ring(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue(self.replicator.check_ring()) -# orig_check = self.replicator.next_check -# self.replicator.next_check = orig_check - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check -# orig_ring_time = self.replicator.object_ring._mtime -# self.replicator.object_ring._mtime = orig_ring_time - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check - 30 -# self.assertFalse(self.replicator.check_ring()) -# -# def test_collect_jobs(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue('1' in self.replicator.parts_to_delete) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['0']['nodes']], -# [1,2]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['1']['nodes']], -# [1,2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['2']['nodes']], -# [2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['3']['nodes']], -# [3,1]) -# for part in ['0', '1', '2', '3']: -# self.assertEquals(self.replicator.partitions[part]['device'], 'sda') -# self.assertEquals(self.replicator.partitions[part]['path'], -# self.objects) -# -# def test_delete_partition(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# part_path = os.path.join(self.objects, '1') -# self.assertTrue(os.access(part_path, os.F_OK)) -# self.replicator.delete_partition('1') -# self.assertFalse(os.access(part_path, os.F_OK)) -# -# def test_rsync(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('0') -# -# def test_rsync_delete_no(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"), -# (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [False, True, True]) -# -# def test_rsync_delete_yes(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_delete_yes_with_failure(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_failed_drive(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(12,'There was an error in file IO'), -# (0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) + def assertFileData(file_path, data): + with open(file_path, 'r') as fp: + fdata = fp.read() + self.assertEquals(fdata, data) + + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hashes_file = os.path.join(self.objects, '0', + object_replicator.HASH_FILE) + # test that non existant file except caught + self.assertEquals(object_replicator.invalidate_hash(whole_path_from), + None) + # test that hashes get cleared + check_pickle_data = pickle.dumps({data_dir: None}, + object_replicator.PICKLE_PROTOCOL) + for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]: + with open(hashes_file, 'wb') as fp: + pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL) + object_replicator.invalidate_hash(whole_path_from) + assertFileData(hashes_file, check_pickle_data) + + def test_check_ring(self): + self.assertTrue(self.replicator.check_ring()) + orig_check = self.replicator.next_check + self.replicator.next_check = orig_check - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check + orig_ring_time = self.replicator.object_ring._mtime + self.replicator.object_ring._mtime = orig_ring_time - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check - 30 + self.assertFalse(self.replicator.check_ring()) + + def test_collect_jobs(self): + jobs = self.replicator.collect_jobs() + jobs_to_delete = [j for j in jobs if j['delete']] + jobs_to_keep = [j for j in jobs if not j['delete']] + jobs_by_part = {} + for job in jobs: + jobs_by_part[job['partition']] = job + self.assertEquals(len(jobs_to_delete), 1) + self.assertTrue('1', jobs_to_delete[0]['partition']) + self.assertEquals( + [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2]) + self.assertEquals( + [node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['2']['nodes']], [2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['3']['nodes']], [3, 1]) + for part in ['0', '1', '2', '3']: + for node in jobs_by_part[part]['nodes']: + self.assertEquals(node['device'], 'sda') + self.assertEquals(jobs_by_part[part]['path'], + os.path.join(self.objects, part)) + + def test_delete_partition(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + self.assertFalse(os.access(part_path, os.F_OK)) + + def test_rsync(self): + jobs = self.replicator.collect_jobs() + job = jobs[0] + node = job['nodes'][0] + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + with _mock_process([(0, ''), (0, ''), (0, '')]): + self.replicator.rsync(node, job, [data_dir]) + + def test_run_once_recover_from_failure(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect + object_replicator.http_connect = mock_http_connect(200) + # Write some files into '1' and run replicate- they should be moved + # to the other partitoins and then node should get deleted. + cur_part = '1' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + self.assertTrue(os.access(os.path.join(self.objects, + '1', data_dir, ohash), + os.F_OK)) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + for i, result in [('0', True), ('1', False), + ('2', True), ('3', True)]: + self.assertEquals(os.access( + os.path.join(self.objects, + i, object_replicator.HASH_FILE), + os.F_OK), result) + object_replicator.http_connect = was_connector def test_run(self): - with _mock_process([(0,'')]*100): + with _mock_process([(0, '')]*100): self.replicator.replicate() def test_run_withlog(self): - with _mock_process([(0,"stuff in log")]*100): + with _mock_process([(0, "stuff in log")]*100): self.replicator.replicate() if __name__ == '__main__': From aed24cf328d63aeb3e091a4ecea6a79cad4af474 Mon Sep 17 00:00:00 2001 From: David Goetz Date: Tue, 16 Nov 2010 08:32:03 -0800 Subject: [PATCH 4/5] changes from code review --- swift/obj/replicator.py | 6 +++++- test/unit/obj/test_replicator.py | 21 ++------------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 0eec1920c8..9b0294627e 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -61,7 +61,7 @@ def hash_suffix(path, reclaim_age): elif files: files.sort(reverse=True) meta = data = tomb = None - for filename in files[:]: + for filename in list(files): if not meta and filename.endswith('.meta'): meta = filename if not data and filename.endswith('.data'): @@ -471,6 +471,10 @@ class ObjectReplicator(Daemon): self.last_replication_count = self.replication_count def collect_jobs(self): + """ + Returns a sorted list of jobs (dictionaries) that specify the + partitions, nodes, etc to be rsynced. + """ jobs = [] ips = whataremyips() for local_dev in [dev for dev in self.object_ring.devs diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 58b804460b..ec69dc0439 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -38,14 +38,6 @@ def _ips(): object_replicator.whataremyips = _ips -class NullHandler(logging.Handler): - - def emit(self, record): - pass -null_logger = logging.getLogger("testing") -null_logger.addHandler(NullHandler()) - - def mock_http_connect(status): class FakeConn(object): @@ -326,15 +318,6 @@ class TestObjectReplicator(unittest.TestCase): self.replicator.replicate() self.assertFalse(os.access(part_path, os.F_OK)) - def test_rsync(self): - jobs = self.replicator.collect_jobs() - job = jobs[0] - node = job['nodes'][0] - ohash = hash_path('a', 'c', 'o') - data_dir = ohash[-3:] - with _mock_process([(0, ''), (0, ''), (0, '')]): - self.replicator.rsync(node, job, [data_dir]) - def test_run_once_recover_from_failure(self): replicator = object_replicator.ObjectReplicator( dict(swift_dir=self.testdir, devices=self.devices, @@ -377,11 +360,11 @@ class TestObjectReplicator(unittest.TestCase): object_replicator.http_connect = was_connector def test_run(self): - with _mock_process([(0, '')]*100): + with _mock_process([(0, '')] * 100): self.replicator.replicate() def test_run_withlog(self): - with _mock_process([(0, "stuff in log")]*100): + with _mock_process([(0, "stuff in log")] * 100): self.replicator.replicate() if __name__ == '__main__': From dcd3743defc0efb855ddf8580b8cc0e567339cbf Mon Sep 17 00:00:00 2001 From: David Goetz Date: Tue, 16 Nov 2010 11:06:39 -0800 Subject: [PATCH 5/5] adding temp dir thing --- test/unit/obj/test_replicator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index ec69dc0439..657570409d 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -23,6 +23,7 @@ import cPickle as pickle import logging import fcntl import time +import tempfile from contextlib import contextmanager from eventlet import tpool @@ -131,7 +132,7 @@ class TestObjectReplicator(unittest.TestCase): def setUp(self): # Setup a test ring (stolen from common/test_ring.py) - self.testdir = os.path.join('/dev/shm', 'test_replicator') + self.testdir = tempfile.mkdtemp() self.devices = os.path.join(self.testdir, 'node') rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir)