refactor obj-rep a bit and move local hash recalculation to before rsync

This commit is contained in:
Michael Barton 2011-03-24 19:37:29 +00:00 committed by Tarmac
commit 716418ba9c
3 changed files with 37 additions and 52 deletions

View File

@ -85,37 +85,6 @@ def hash_suffix(path, reclaim_age):
return md5.hexdigest()
def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
"""
Recalculates hashes for the given suffixes in the partition and updates
them in the partition's hashes file.
:param partition_dir: directory of the partition in which to recalculate
:param suffixes: list of suffixes to recalculate
:param reclaim_age: age in seconds at which tombstones should be removed
"""
def tpool_listdir(partition_dir):
return dict(((suff, None) for suff in os.listdir(partition_dir)
if len(suff) == 3 and isdir(join(partition_dir, suff))))
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
except Exception:
hashes = tpool.execute(tpool_listdir, partition_dir)
for suffix in suffixes:
suffix_dir = join(partition_dir, suffix)
if os.path.exists(suffix_dir):
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
elif suffix in hashes:
del hashes[suffix]
with open(hashes_file + '.tmp', 'wb') as fp:
pickle.dump(hashes, fp, PICKLE_PROTOCOL)
renamer(hashes_file + '.tmp', hashes_file)
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
@ -141,23 +110,21 @@ def invalidate_hash(suffix_dir):
renamer(hashes_file + '.tmp', hashes_file)
def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
def get_hashes(partition_dir, recalculate=[], do_listdir=False,
reclaim_age=ONE_WEEK):
"""
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
the hash cache for suffix existence at the (unexpectedly high) cost of a
listdir. reclaim_age is just passed on to hash_suffix.
:param partition_dir: absolute path of partition to get hashes for
:param recalculate: list of suffixes which should be recalculated when got
:param do_listdir: force existence check for all hashes in the partition
:param reclaim_age: age at which to remove tombstones
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
"""
def tpool_listdir(hashes, partition_dir):
return dict(((suff, hashes.get(suff, None))
for suff in os.listdir(partition_dir)
if len(suff) == 3 and isdir(join(partition_dir, suff))))
hashed = 0
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
@ -169,8 +136,12 @@ def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
except Exception:
do_listdir = True
if do_listdir:
hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
hashes = dict(((suff, hashes.get(suff, None))
for suff in os.listdir(partition_dir)
if len(suff) == 3 and isdir(join(partition_dir, suff))))
modified = True
for hash_ in recalculate:
hashes[hash_] = None
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = join(partition_dir, suffix)
@ -342,8 +313,7 @@ class ObjectReplicator(Daemon):
success = self.rsync(node, job, suffixes)
if success:
with Timeout(self.http_timeout):
http_connect(node['ip'],
node['port'],
http_connect(node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers={'Content-Length': '0'}).getresponse().read()
@ -366,7 +336,7 @@ class ObjectReplicator(Daemon):
self.replication_count += 1
begin = time.time()
try:
hashed, local_hash = get_hashes(job['path'],
hashed, local_hash = tpool.execute(get_hashes, job['path'],
do_listdir=(self.replication_count % 10) == 0,
reclaim_age=self.reclaim_age)
self.suffix_hash += hashed
@ -394,14 +364,15 @@ class ObjectReplicator(Daemon):
continue
remote_hash = pickle.loads(resp.read())
del resp
suffixes = [suffix for suffix in local_hash
if local_hash[suffix] !=
remote_hash.get(suffix, -1)]
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] != remote_hash.get(suffix, -1)]
if not suffixes:
continue
hashed, local_hash = tpool.execute(get_hashes, job['path'],
recalculate=suffixes, reclaim_age=self.reclaim_age)
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] != remote_hash.get(suffix, -1)]
self.rsync(node, job, suffixes)
recalculate_hashes(job['path'], suffixes,
reclaim_age=self.reclaim_age)
with Timeout(self.http_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE',
@ -556,7 +527,7 @@ class ObjectReplicator(Daemon):
_("Object replication complete. (%.02f minutes)"), total)
def run_forever(self, *args, **kwargs):
self.logger.info("Starting object replicator in daemon mode.")
self.logger.info(_("Starting object replicator in daemon mode."))
# Run the replicator continually
while True:
start = time.time()

View File

@ -42,8 +42,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout
from swift.obj.replicator import get_hashes, invalidate_hash, \
recalculate_hashes
from swift.obj.replicator import get_hashes, invalidate_hash
DATADIR = 'objects'
@ -583,10 +582,8 @@ class ObjectController(object):
path = os.path.join(self.devices, device, DATADIR, partition)
if not os.path.exists(path):
mkdirs(path)
if suffix:
recalculate_hashes(path, suffix.split('-'))
return Response()
_junk, hashes = get_hashes(path, do_listdir=False)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = tpool.execute(get_hashes, path, recalculate=suffixes)
return Response(body=pickle.dumps(hashes))
def __call__(self, env, start_response):

View File

@ -188,6 +188,23 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = was_connector
def test_get_hashes(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
with open(os.path.join(df.datadir, normalize_timestamp(
time.time()) + '.ts'), 'wb') as f:
f.write('1234567890')
part = os.path.join(self.objects, '0')
hashed, hashes = object_replicator.get_hashes(part)
self.assertEquals(hashed, 1)
self.assert_('a83' in hashes)
hashed, hashes = object_replicator.get_hashes(part, do_listdir=True)
self.assertEquals(hashed, 0)
self.assert_('a83' in hashes)
hashed, hashes = object_replicator.get_hashes(part, recalculate=['a83'])
self.assertEquals(hashed, 1)
self.assert_('a83' in hashes)
def test_hash_suffix_one_file(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)