Merge "Fix object replicator to handle Timeouts fixes: lp 814263"
This commit is contained in:
commit
79fbd95433
@ -199,14 +199,16 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
|
||||
return hashed, hashes
|
||||
|
||||
|
||||
# Hack to work around Eventlet's tpool not catching and reraising Timeouts. We
|
||||
# return the Timeout, Timeout if it's raised, the caller looks for it and
|
||||
# reraises it if found.
|
||||
def tpooled_get_hashes(*args, **kwargs):
|
||||
"""
|
||||
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
|
||||
We return the Timeout, None if it's raised, the caller looks for it
|
||||
and reraises it if found.
|
||||
"""
|
||||
try:
|
||||
return get_hashes(*args, **kwargs)
|
||||
except Timeout, err:
|
||||
return err, err
|
||||
return err, None
|
||||
|
||||
|
||||
class ObjectReplicator(Daemon):
|
||||
@ -422,12 +424,13 @@ class ObjectReplicator(Daemon):
|
||||
local_hash[suffix] != remote_hash.get(suffix, -1)]
|
||||
if not suffixes:
|
||||
continue
|
||||
hashed, local_hash = tpool.execute(tpooled_get_hashes,
|
||||
hashed, recalc_hash = tpool.execute(tpooled_get_hashes,
|
||||
job['path'], recalculate=suffixes,
|
||||
reclaim_age=self.reclaim_age)
|
||||
# See tpooled_get_hashes "Hack".
|
||||
if isinstance(hashed, BaseException):
|
||||
raise hashed
|
||||
local_hash = recalc_hash
|
||||
suffixes = [suffix for suffix in local_hash if
|
||||
local_hash[suffix] != remote_hash.get(suffix, -1)]
|
||||
self.rsync(node, job, suffixes)
|
||||
|
@ -26,6 +26,7 @@ import time
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import Timeout, tpool
|
||||
from test.unit import FakeLogger
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||
@ -201,7 +202,8 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
hashed, hashes = object_replicator.get_hashes(part, do_listdir=True)
|
||||
self.assertEquals(hashed, 0)
|
||||
self.assert_('a83' in hashes)
|
||||
hashed, hashes = object_replicator.get_hashes(part, recalculate=['a83'])
|
||||
hashed, hashes = object_replicator.get_hashes(part,
|
||||
recalculate=['a83'])
|
||||
self.assertEquals(hashed, 1)
|
||||
self.assert_('a83' in hashes)
|
||||
|
||||
@ -364,6 +366,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
dict(swift_dir=self.testdir, devices=self.devices,
|
||||
mount_check='false', timeout='300', stats_interval='1'))
|
||||
was_connector = object_replicator.http_connect
|
||||
try:
|
||||
object_replicator.http_connect = mock_http_connect(200)
|
||||
# Write some files into '1' and run replicate- they should be moved
|
||||
# to the other partitoins and then node should get deleted.
|
||||
@ -384,7 +387,8 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.ring.get_part_nodes(int(cur_part)) \
|
||||
if node['ip'] not in _ips()]
|
||||
for node in nodes:
|
||||
rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'], cur_part)
|
||||
rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'],
|
||||
cur_part)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
self.assertTrue(os.access(os.path.join(self.objects,
|
||||
@ -399,8 +403,70 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
os.path.join(self.objects,
|
||||
i, object_replicator.HASH_FILE),
|
||||
os.F_OK), result)
|
||||
finally:
|
||||
object_replicator.http_connect = was_connector
|
||||
|
||||
def test_run_once_recover_from_timeout(self):
|
||||
replicator = object_replicator.ObjectReplicator(
|
||||
dict(swift_dir=self.testdir, devices=self.devices,
|
||||
mount_check='false', timeout='300', stats_interval='1'))
|
||||
was_connector = object_replicator.http_connect
|
||||
was_get_hashes = object_replicator.get_hashes
|
||||
was_execute = tpool.execute
|
||||
self.get_hash_count = 0
|
||||
try:
|
||||
|
||||
def fake_get_hashes(*args, **kwargs):
|
||||
self.get_hash_count += 1
|
||||
if self.get_hash_count == 3:
|
||||
# raise timeout on last call to get hashes
|
||||
raise Timeout()
|
||||
return 2, {'abc': 'def'}
|
||||
|
||||
def fake_exc(tester, *args, **kwargs):
|
||||
if 'Error syncing partition' in args[0]:
|
||||
tester.i_failed = True
|
||||
|
||||
self.i_failed = False
|
||||
object_replicator.http_connect = mock_http_connect(200)
|
||||
object_replicator.get_hashes = fake_get_hashes
|
||||
replicator.logger.exception = \
|
||||
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
|
||||
# Write some files into '1' and run replicate- they should be moved
|
||||
# to the other partitoins and then node should get deleted.
|
||||
cur_part = '1'
|
||||
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
|
||||
process_arg_checker = []
|
||||
nodes = [node for node in
|
||||
self.ring.get_part_nodes(int(cur_part)) \
|
||||
if node['ip'] not in _ips()]
|
||||
for node in nodes:
|
||||
rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'],
|
||||
cur_part)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
self.assertTrue(os.access(os.path.join(self.objects,
|
||||
'1', data_dir, ohash),
|
||||
os.F_OK))
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
self.assertFalse(process_errors)
|
||||
self.assertFalse(self.i_failed)
|
||||
finally:
|
||||
object_replicator.http_connect = was_connector
|
||||
object_replicator.get_hashes = was_get_hashes
|
||||
tpool.execute = was_execute
|
||||
|
||||
def test_run(self):
|
||||
with _mock_process([(0, '')] * 100):
|
||||
self.replicator.replicate()
|
||||
|
Loading…
Reference in New Issue
Block a user