Handle TimeoutErrors with a tpooled get_hashes
This commit is contained in:
commit
f3ab8693fa
@ -163,6 +163,16 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
|
||||
return hashed, hashes
|
||||
|
||||
|
||||
# Hack to work around Eventlet's tpool not catching and reraising Timeouts. We
|
||||
# return the Timeout, Timeout if it's raised, the caller looks for it and
|
||||
# reraises it if found.
|
||||
def tpooled_get_hashes(*args, **kwargs):
|
||||
try:
|
||||
return get_hashes(*args, **kwargs)
|
||||
except Timeout, err:
|
||||
return err, err
|
||||
|
||||
|
||||
class ObjectReplicator(Daemon):
|
||||
"""
|
||||
Replicate objects.
|
||||
@ -336,9 +346,12 @@ class ObjectReplicator(Daemon):
|
||||
self.replication_count += 1
|
||||
begin = time.time()
|
||||
try:
|
||||
hashed, local_hash = tpool.execute(get_hashes, job['path'],
|
||||
hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
|
||||
do_listdir=(self.replication_count % 10) == 0,
|
||||
reclaim_age=self.reclaim_age)
|
||||
# See tpooled_get_hashes "Hack".
|
||||
if isinstance(hashed, BaseException):
|
||||
raise hashed
|
||||
self.suffix_hash += hashed
|
||||
attempts_left = self.object_ring.replica_count - 1
|
||||
nodes = itertools.chain(job['nodes'],
|
||||
@ -368,8 +381,12 @@ class ObjectReplicator(Daemon):
|
||||
local_hash[suffix] != remote_hash.get(suffix, -1)]
|
||||
if not suffixes:
|
||||
continue
|
||||
hashed, local_hash = tpool.execute(get_hashes, job['path'],
|
||||
recalculate=suffixes, reclaim_age=self.reclaim_age)
|
||||
hashed, local_hash = tpool.execute(tpooled_get_hashes,
|
||||
job['path'], recalculate=suffixes,
|
||||
reclaim_age=self.reclaim_age)
|
||||
# See tpooled_get_hashes "Hack".
|
||||
if isinstance(hashed, BaseException):
|
||||
raise hashed
|
||||
suffixes = [suffix for suffix in local_hash if
|
||||
local_hash[suffix] != remote_hash.get(suffix, -1)]
|
||||
self.rsync(node, job, suffixes)
|
||||
|
@ -44,7 +44,7 @@ from swift.common.constraints import check_object_creation, check_mount, \
|
||||
check_float, check_utf8
|
||||
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
||||
DiskFileNotExist
|
||||
from swift.obj.replicator import get_hashes, invalidate_hash
|
||||
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash
|
||||
|
||||
|
||||
DATADIR = 'objects'
|
||||
@ -708,7 +708,11 @@ class ObjectController(object):
|
||||
if not os.path.exists(path):
|
||||
mkdirs(path)
|
||||
suffixes = suffix.split('-') if suffix else []
|
||||
_junk, hashes = tpool.execute(get_hashes, path, recalculate=suffixes)
|
||||
_junk, hashes = tpool.execute(tpooled_get_hashes, path,
|
||||
recalculate=suffixes)
|
||||
# See tpooled_get_hashes "Hack".
|
||||
if isinstance(hashes, BaseException):
|
||||
raise hashes
|
||||
return Response(body=pickle.dumps(hashes))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
|
Loading…
x
Reference in New Issue
Block a user