Merge "Remove object replicator's lockup detector/mitigator."
This commit is contained in:
commit
10eb94b3d8
@ -25,9 +25,8 @@ import six.moves.cPickle as pickle
|
||||
from swift import gettext_ as _
|
||||
|
||||
import eventlet
|
||||
from eventlet import GreenPool, tpool, Timeout, sleep
|
||||
from eventlet import GreenPool, queue, tpool, Timeout, sleep
|
||||
from eventlet.green import subprocess
|
||||
from eventlet.support.greenlets import GreenletExit
|
||||
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.common.ring.utils import is_local_device
|
||||
@ -90,7 +89,6 @@ class ObjectReplicator(Daemon):
|
||||
if not self.rsync_module:
|
||||
self.rsync_module = '{replication_ip}::object'
|
||||
self.http_timeout = int(conf.get('http_timeout', 60))
|
||||
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
|
||||
@ -113,6 +111,7 @@ class ObjectReplicator(Daemon):
|
||||
'handoff_delete before the next '
|
||||
'normal rebalance')
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self._child_process_reaper_queue = queue.LightQueue()
|
||||
|
||||
def _zero_stats(self):
|
||||
"""Zero out the stats."""
|
||||
@ -138,6 +137,36 @@ class ObjectReplicator(Daemon):
|
||||
my_replication_ips.add(local_dev['replication_ip'])
|
||||
return list(my_replication_ips)
|
||||
|
||||
def _child_process_reaper(self):
|
||||
"""
|
||||
Consume processes from self._child_process_reaper_queue and wait() for
|
||||
them
|
||||
"""
|
||||
procs = set()
|
||||
done = False
|
||||
while not done:
|
||||
timeout = 60 if procs else None
|
||||
try:
|
||||
new_proc = self._child_process_reaper_queue.get(
|
||||
timeout=timeout)
|
||||
if new_proc is not None:
|
||||
procs.add(new_proc)
|
||||
else:
|
||||
done = True
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
reaped_procs = set()
|
||||
for proc in procs:
|
||||
try:
|
||||
# this will reap the process if it has exited, but
|
||||
# otherwise will not wait
|
||||
proc.wait(timeout=0)
|
||||
reaped_procs.add(proc)
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
procs -= reaped_procs
|
||||
|
||||
# Just exists for doc anchor point
|
||||
def sync(self, node, job, suffixes, *args, **kwargs):
|
||||
"""
|
||||
@ -169,7 +198,7 @@ class ObjectReplicator(Daemon):
|
||||
:returns: return code of rsync process. 0 is successful
|
||||
"""
|
||||
start_time = time.time()
|
||||
proc = ret_val = None
|
||||
proc = None
|
||||
|
||||
try:
|
||||
with Timeout(self.rsync_timeout):
|
||||
@ -178,25 +207,28 @@ class ObjectReplicator(Daemon):
|
||||
stderr=subprocess.STDOUT)
|
||||
results = proc.stdout.read()
|
||||
ret_val = proc.wait()
|
||||
except GreenletExit:
|
||||
self.logger.error(_("Killing by lockup detector"))
|
||||
if proc:
|
||||
# Assume rsync is still responsive and give it a chance
|
||||
# to shut down gracefully
|
||||
proc.terminate()
|
||||
# Final good-faith effort to clean up the process table.
|
||||
# Note that this blocks, but worst-case we wait for the
|
||||
# lockup detector to come around and kill us. This can
|
||||
# happen if the process is stuck down in kernel-space
|
||||
# waiting on I/O or something.
|
||||
proc.wait()
|
||||
raise
|
||||
except Timeout:
|
||||
self.logger.error(_("Killing long-running rsync: %s"), str(args))
|
||||
if proc:
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
try:
|
||||
# Note: Python 2.7's subprocess.Popen class doesn't take
|
||||
# any arguments for wait(), but Python 3's does.
|
||||
# However, Eventlet's replacement Popen takes a timeout
|
||||
# argument regardless of Python version, so we don't
|
||||
# need any conditional code here.
|
||||
proc.wait(timeout=1.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
# Sometimes a process won't die immediately even after a
|
||||
# SIGKILL. This can be due to failing disks, high load,
|
||||
# or other reasons. We can't wait for it forever since
|
||||
# we're taking up a slot in the (green)thread pool, so
|
||||
# we send it over to another greenthread, not part of
|
||||
# our pool, whose sole duty is to wait for child
|
||||
# processes to exit.
|
||||
self._child_process_reaper_queue.put(proc)
|
||||
return 1 # failure response code
|
||||
|
||||
total_time = time.time() - start_time
|
||||
for result in results.split('\n'):
|
||||
if result == '':
|
||||
@ -554,14 +586,6 @@ class ObjectReplicator(Daemon):
|
||||
_("Nothing replicated for %s seconds."),
|
||||
(time.time() - self.start))
|
||||
|
||||
def kill_coros(self):
|
||||
"""Utility function that kills all coroutines currently running."""
|
||||
for coro in list(self.run_pool.coroutines_running):
|
||||
try:
|
||||
coro.kill(GreenletExit)
|
||||
except GreenletExit:
|
||||
pass
|
||||
|
||||
def heartbeat(self):
|
||||
"""
|
||||
Loop that runs in the background during replication. It periodically
|
||||
@ -571,19 +595,6 @@ class ObjectReplicator(Daemon):
|
||||
eventlet.sleep(self.stats_interval)
|
||||
self.stats_line()
|
||||
|
||||
def detect_lockups(self):
|
||||
"""
|
||||
In testing, the pool.waitall() call very occasionally failed to return.
|
||||
This is an attempt to make sure the replicator finishes its replication
|
||||
pass in some eventuality.
|
||||
"""
|
||||
while True:
|
||||
eventlet.sleep(self.lockup_timeout)
|
||||
if self.replication_count == self.last_replication_count:
|
||||
self.logger.error(_("Lockup detected.. killing live coros."))
|
||||
self.kill_coros()
|
||||
self.last_replication_count = self.replication_count
|
||||
|
||||
def build_replication_jobs(self, policy, ips, override_devices=None,
|
||||
override_partitions=None):
|
||||
"""
|
||||
@ -734,7 +745,6 @@ class ObjectReplicator(Daemon):
|
||||
self.handoffs_remaining = 0
|
||||
|
||||
stats = eventlet.spawn(self.heartbeat)
|
||||
lockup_detector = eventlet.spawn(self.detect_lockups)
|
||||
eventlet.sleep() # Give spawns a cycle
|
||||
|
||||
current_nodes = None
|
||||
@ -783,8 +793,7 @@ class ObjectReplicator(Daemon):
|
||||
else:
|
||||
self.run_pool.spawn(self.update, job)
|
||||
current_nodes = None
|
||||
with Timeout(self.lockup_timeout):
|
||||
self.run_pool.waitall()
|
||||
self.run_pool.waitall()
|
||||
except (Exception, Timeout):
|
||||
if current_nodes:
|
||||
self._add_failure_stats([(failure_dev['replication_ip'],
|
||||
@ -793,14 +802,14 @@ class ObjectReplicator(Daemon):
|
||||
else:
|
||||
self._add_failure_stats(self.all_devs_info)
|
||||
self.logger.exception(_("Exception in top-level replication loop"))
|
||||
self.kill_coros()
|
||||
finally:
|
||||
stats.kill()
|
||||
lockup_detector.kill()
|
||||
self.stats_line()
|
||||
self.stats['attempted'] = self.replication_count
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
rsync_reaper = eventlet.spawn(self._child_process_reaper)
|
||||
|
||||
self._zero_stats()
|
||||
self.logger.info(_("Running object replicator in script mode."))
|
||||
|
||||
@ -830,8 +839,14 @@ class ObjectReplicator(Daemon):
|
||||
'object_replication_last': replication_last},
|
||||
self.rcache, self.logger)
|
||||
|
||||
# Give rsync processes one last chance to exit, then bail out and
|
||||
# let them be init's problem
|
||||
self._child_process_reaper_queue.put(None)
|
||||
rsync_reaper.wait()
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
self.logger.info(_("Starting object replicator in daemon mode."))
|
||||
eventlet.spawn_n(self._child_process_reaper)
|
||||
# Run the replicator continually
|
||||
while True:
|
||||
self._zero_stats()
|
||||
|
@ -30,8 +30,7 @@ from eventlet.green import subprocess
|
||||
from eventlet import Timeout, sleep
|
||||
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
mocked_http_conn, mock_check_drive, skip_if_no_xattrs,
|
||||
SkipTest)
|
||||
mocked_http_conn, mock_check_drive, skip_if_no_xattrs)
|
||||
from swift.common import utils
|
||||
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
|
||||
storage_directory)
|
||||
@ -134,20 +133,28 @@ def _mock_process(ret):
|
||||
|
||||
|
||||
class MockHungProcess(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, waits_needed=1, *args, **kwargs):
|
||||
class MockStdout(object):
|
||||
def read(self):
|
||||
pass
|
||||
self.stdout = MockStdout()
|
||||
self._state = 'running'
|
||||
self._calls = []
|
||||
self._waits = 0
|
||||
self._waits_needed = waits_needed
|
||||
|
||||
def wait(self):
|
||||
def wait(self, timeout=None):
|
||||
self._calls.append(('wait', self._state))
|
||||
if self._state == 'running':
|
||||
# Sleep so we trip either the lockup detector or the rsync timeout
|
||||
# Sleep so we trip the rsync timeout
|
||||
sleep(1)
|
||||
raise BaseException('You need to mock out some timeouts')
|
||||
elif self._state == 'killed':
|
||||
self._waits += 1
|
||||
if self._waits >= self._waits_needed:
|
||||
return
|
||||
else:
|
||||
raise subprocess.TimeoutExpired('some cmd', timeout)
|
||||
|
||||
def terminate(self):
|
||||
self._calls.append(('terminate', self._state))
|
||||
@ -2036,38 +2043,6 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertIn(
|
||||
"next_part_power set in policy 'one'. Skipping", warnings)
|
||||
|
||||
def test_replicate_lockup_detector(self):
|
||||
raise SkipTest("this is not a reliable test and must be fixed")
|
||||
cur_part = '0'
|
||||
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
||||
policy=POLICIES[0])
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
|
||||
mock_procs = []
|
||||
|
||||
def new_mock(*a, **kw):
|
||||
proc = MockHungProcess()
|
||||
mock_procs.append(proc)
|
||||
return proc
|
||||
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)), \
|
||||
mock.patch.object(self.replicator, 'lockup_timeout', 0.01), \
|
||||
mock.patch('eventlet.green.subprocess.Popen', new_mock):
|
||||
self.replicator.replicate()
|
||||
for proc in mock_procs:
|
||||
self.assertEqual(proc._calls, [
|
||||
('wait', 'running'),
|
||||
('terminate', 'running'),
|
||||
('wait', 'terminating'),
|
||||
])
|
||||
self.assertEqual(len(mock_procs), 1)
|
||||
|
||||
def test_replicate_rsync_timeout(self):
|
||||
cur_part = '0'
|
||||
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
||||
@ -2090,7 +2065,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
mock_http_connect(200)), \
|
||||
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
|
||||
mock.patch('eventlet.green.subprocess.Popen', new_mock):
|
||||
self.replicator.replicate()
|
||||
self.replicator.run_once()
|
||||
for proc in mock_procs:
|
||||
self.assertEqual(proc._calls, [
|
||||
('wait', 'running'),
|
||||
@ -2099,5 +2074,38 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
])
|
||||
self.assertEqual(len(mock_procs), 2)
|
||||
|
||||
def test_replicate_rsync_timeout_wedged(self):
|
||||
cur_part = '0'
|
||||
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
||||
policy=POLICIES[0])
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
|
||||
mock_procs = []
|
||||
|
||||
def new_mock(*a, **kw):
|
||||
proc = MockHungProcess(waits_needed=2)
|
||||
mock_procs.append(proc)
|
||||
return proc
|
||||
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)), \
|
||||
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
|
||||
mock.patch('eventlet.green.subprocess.Popen', new_mock):
|
||||
self.replicator.run_once()
|
||||
for proc in mock_procs:
|
||||
self.assertEqual(proc._calls, [
|
||||
('wait', 'running'),
|
||||
('kill', 'running'),
|
||||
('wait', 'killed'),
|
||||
('wait', 'killed'),
|
||||
])
|
||||
self.assertEqual(len(mock_procs), 2)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user