Adding unit tests for object replicator. Fixing bug in hash_suffix.

This commit is contained in:
David Goetz 2010-11-16 19:52:18 +00:00 committed by Tarmac
commit ceb2bfaa2f
3 changed files with 301 additions and 117 deletions

View File

@ -29,7 +29,6 @@ from urllib import quote
from contextlib import contextmanager
import ctypes
import ctypes.util
import fcntl
import struct
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from tempfile import mkstemp

View File

@ -61,7 +61,7 @@ def hash_suffix(path, reclaim_age):
elif files:
files.sort(reverse=True)
meta = data = tomb = None
for filename in files:
for filename in list(files):
if not meta and filename.endswith('.meta'):
meta = filename
if not data and filename.endswith('.data'):
@ -232,7 +232,7 @@ class ObjectReplicator(Daemon):
"""
Execute the rsync binary to replicate a partition.
:returns: a tuple of (rsync exit code, rsync standard output)
:returns: return code of rsync process. 0 is successful
"""
start_time = time.time()
ret_val = None
@ -470,6 +470,40 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
def collect_jobs(self):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
"""
jobs = []
ips = whataremyips()
for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs
def replicate(self):
"""Run a replication pass"""
self.start = time.time()
@ -479,38 +513,11 @@ class ObjectReplicator(Daemon):
self.replication_count = 0
self.last_replication_count = -1
self.partition_times = []
jobs = []
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
try:
ips = whataremyips()
self.run_pool = GreenPool(size=self.concurrency)
for local_dev in [
dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
jobs = self.collect_jobs()
for job in jobs:
if not self.check_ring():
self.logger.info(

View File

@ -22,46 +22,88 @@ from shutil import rmtree
import cPickle as pickle
import logging
import fcntl
import time
import tempfile
from contextlib import contextmanager
from eventlet import tpool
from eventlet.green import subprocess
from swift.obj import replicator as object_replicator
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
from swift.obj import replicator as object_replicator
from swift.obj.server import DiskFile
def _ips():
return ['127.0.0.0',]
return ['127.0.0.0']
object_replicator.whataremyips = _ips
class NullHandler(logging.Handler):
def emit(self, record):
pass
null_logger = logging.getLogger("testing")
null_logger.addHandler(NullHandler())
def mock_http_connect(status):
class FakeConn(object):
def __init__(self, status, *args, **kwargs):
self.status = status
self.reason = 'Fake'
self.host = args[0]
self.port = args[1]
self.method = args[4]
self.path = args[5]
self.with_exc = False
self.headers = kwargs.get('headers', {})
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def getheader(self, header):
return self.headers[header]
def read(self, amt=None):
return pickle.dumps({})
def close(self):
return
return lambda *args, **kwargs: FakeConn(status, *args, **kwargs)
process_errors = []
class MockProcess(object):
ret_code = None
ret_log = None
check_args = None
class Stream(object):
def read(self):
return MockProcess.ret_log.next()
def __init__(self, *args, **kwargs):
targs = MockProcess.check_args.next()
for targ in targs:
if targ not in args[0]:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
self.stdout = self.Stream()
def wait(self):
return self.ret_code.next()
@contextmanager
def _mock_process(ret):
orig_process = subprocess.Popen
MockProcess.ret_code = (i[0] for i in ret)
MockProcess.ret_log = (i[1] for i in ret)
MockProcess.check_args = (i[2] for i in ret)
object_replicator.subprocess.Popen = MockProcess
yield
object_replicator.subprocess.Popen = orig_process
def _create_test_ring(path):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
@ -90,7 +132,7 @@ class TestObjectReplicator(unittest.TestCase):
def setUp(self):
# Setup a test ring (stolen from common/test_ring.py)
self.testdir = os.path.join('/dev/shm', 'test_replicator')
self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'node')
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
@ -98,7 +140,9 @@ class TestObjectReplicator(unittest.TestCase):
os.mkdir(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda', 'objects')
os.mkdir(self.objects)
for part in ['0','1','2', '3']:
self.parts = {}
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
os.mkdir(os.path.join(self.objects, part))
self.ring = _create_test_ring(self.testdir)
self.conf = dict(
@ -107,87 +151,221 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator = object_replicator.ObjectReplicator(
self.conf)
# def test_check_ring(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# self.assertTrue(self.replicator.check_ring())
# orig_check = self.replicator.next_check
# self.replicator.next_check = orig_check - 30
# self.assertTrue(self.replicator.check_ring())
# self.replicator.next_check = orig_check
# orig_ring_time = self.replicator.object_ring._mtime
# self.replicator.object_ring._mtime = orig_ring_time - 30
# self.assertTrue(self.replicator.check_ring())
# self.replicator.next_check = orig_check - 30
# self.assertFalse(self.replicator.check_ring())
#
# def test_collect_jobs(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# self.assertTrue('1' in self.replicator.parts_to_delete)
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['0']['nodes']],
# [1,2])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['1']['nodes']],
# [1,2,3])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['2']['nodes']],
# [2,3])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['3']['nodes']],
# [3,1])
# for part in ['0', '1', '2', '3']:
# self.assertEquals(self.replicator.partitions[part]['device'], 'sda')
# self.assertEquals(self.replicator.partitions[part]['path'],
# self.objects)
#
# def test_delete_partition(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# part_path = os.path.join(self.objects, '1')
# self.assertTrue(os.access(part_path, os.F_OK))
# self.replicator.delete_partition('1')
# self.assertFalse(os.access(part_path, os.F_OK))
#
# def test_rsync(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(0,''), (0,''), (0,'')]):
# self.replicator.rsync('0')
#
# def test_rsync_delete_no(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"),
# (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [False, True, True])
#
# def test_rsync_delete_yes(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(0,''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
#
# def test_rsync_delete_yes_with_failure(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
#
# def test_rsync_failed_drive(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(12,'There was an error in file IO'),
# (0,''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
def tearDown(self):
process_errors = []
rmtree(self.testdir, ignore_errors=1)
def test_run_once(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1'))
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
cur_part = '0'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in
self.ring.get_part_nodes(int(cur_part)) \
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
object_replicator.http_connect = was_connector
def test_hash_suffix_one_file(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time() - 100) + '.ts'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
object_replicator.hash_suffix(whole_path_from, 101)
self.assertEquals(len(os.listdir(self.parts['0'])), 1)
object_replicator.hash_suffix(whole_path_from, 99)
self.assertEquals(len(os.listdir(self.parts['0'])), 0)
def test_hash_suffix_multi_file_one(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
for suff in ['.meta', '.data', '.ts']:
f = open(os.path.join(df.datadir,
normalize_timestamp(int(time.time()) - tdiff) + suff),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
object_replicator.hash_suffix(whole_path_from, 99)
# only the tombstone should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
def test_hash_suffix_multi_file_two(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data']
if tdiff > 50:
suffs.append('.ts')
for suff in suffs:
f = open(os.path.join(df.datadir,
normalize_timestamp(int(time.time()) - tdiff) + suff),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
object_replicator.hash_suffix(whole_path_from, 99)
# only the meta and data should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
def test_invalidate_hash(self):
def assertFileData(file_path, data):
with open(file_path, 'r') as fp:
fdata = fp.read()
self.assertEquals(fdata, data)
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hashes_file = os.path.join(self.objects, '0',
object_replicator.HASH_FILE)
# test that non existant file except caught
self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
None)
# test that hashes get cleared
check_pickle_data = pickle.dumps({data_dir: None},
object_replicator.PICKLE_PROTOCOL)
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
with open(hashes_file, 'wb') as fp:
pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
object_replicator.invalidate_hash(whole_path_from)
assertFileData(hashes_file, check_pickle_data)
def test_check_ring(self):
self.assertTrue(self.replicator.check_ring())
orig_check = self.replicator.next_check
self.replicator.next_check = orig_check - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check
orig_ring_time = self.replicator.object_ring._mtime
self.replicator.object_ring._mtime = orig_ring_time - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check - 30
self.assertFalse(self.replicator.check_ring())
def test_collect_jobs(self):
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_to_keep = [j for j in jobs if not j['delete']]
jobs_by_part = {}
for job in jobs:
jobs_by_part[job['partition']] = job
self.assertEquals(len(jobs_to_delete), 1)
self.assertTrue('1', jobs_to_delete[0]['partition'])
self.assertEquals(
[node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
self.assertEquals(
[node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['2']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['3']['nodes']], [3, 1])
for part in ['0', '1', '2', '3']:
for node in jobs_by_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_part[part]['path'],
os.path.join(self.objects, part))
def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
def test_run_once_recover_from_failure(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1'))
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in
self.ring.get_part_nodes(int(cur_part)) \
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
for i, result in [('0', True), ('1', False),
('2', True), ('3', True)]:
self.assertEquals(os.access(
os.path.join(self.objects,
i, object_replicator.HASH_FILE),
os.F_OK), result)
object_replicator.http_connect = was_connector
def test_run(self):
with _mock_process([(0,'')]*100):
with _mock_process([(0, '')] * 100):
self.replicator.replicate()
def test_run_withlog(self):
with _mock_process([(0,"stuff in log")]*100):
with _mock_process([(0, "stuff in log")] * 100):
self.replicator.replicate()
if __name__ == '__main__':