Remove object replicator's lockup detector/mitigator.

Sometimes, an rsync process just won't die. You can send SIGKILL, but
it isn't very effective. This is sometimes seen due to attempted I/O
on a failing disk; with some disks, an rsync process won't die until
Linux finishes the current I/O operation (whether success or failure),
but the disk can't succeed and will retry forever instead of
failing. The net effect is an unkillable rsync process.

The replicator was dealing with this by sending SIGKILL to any rsync
that ran too long, then calling waitpid() in a loop[1] until the rsync
died so it could reap the child process. This worked pretty well
unless it met an unkillable rsync; in that case, one greenthread would
end up blocked for a very long time. Since the replicator's main loop
works by (a) gathering all replication jobs, (b) performing them in
parallel with some limited concurrency, then (c) waiting for all jobs
to complete, an unkillable rsync would block the entire replicator.

There was an attempt to address this by adding a lockup detector: if
the replicator failed to complete any replication cycle in N seconds
[2], all greenthreads except the main one would be terminated and the
replication cycle restarted. It works okay, but only handles total
failure. If you have 20 greenthreads working and 19 of them are
blocked on unkillable rsyncs, then as long as the 20th greenthread
manages to replicate at least one partition every N seconds, the
replicator will just keep limping along.

This commit removes the lockup detector. Instead, when a replicator
greenthread happens upon an rsync that doesn't die promptly after
receiving SIGKILL, the process handle is sent to a background
greenthread; that background greenthread simply waits for those rsync
processes to finally die and reaps them. This lets the replicator make
better progress in the presence of unkillable rsyncs.

[1] It's a call to subprocess.Popen.wait(); the looping and sleeping
happens in eventlet.

[2] The default is 1800 seconds = 30 minutes, but the value is
configurable.

Change-Id: If6dc7b003e18ab4e8a5ed687c965025ebd417dfa
This commit is contained in:
Samuel Merritt 2018-03-12 17:58:23 -07:00
parent 9aca9ad780
commit ecf8ae50e1
2 changed files with 105 additions and 82 deletions

View File

@ -25,9 +25,8 @@ import six.moves.cPickle as pickle
from swift import gettext_ as _
import eventlet
from eventlet import GreenPool, tpool, Timeout, sleep
from eventlet import GreenPool, queue, tpool, Timeout, sleep
from eventlet.green import subprocess
from eventlet.support.greenlets import GreenletExit
from swift.common.constraints import check_drive
from swift.common.ring.utils import is_local_device
@ -90,7 +89,6 @@ class ObjectReplicator(Daemon):
if not self.rsync_module:
self.rsync_module = '{replication_ip}::object'
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
@ -113,6 +111,7 @@ class ObjectReplicator(Daemon):
'handoff_delete before the next '
'normal rebalance')
self._df_router = DiskFileRouter(conf, self.logger)
self._child_process_reaper_queue = queue.LightQueue()
def _zero_stats(self):
"""Zero out the stats."""
@ -138,6 +137,36 @@ class ObjectReplicator(Daemon):
my_replication_ips.add(local_dev['replication_ip'])
return list(my_replication_ips)
def _child_process_reaper(self):
"""
Consume processes from self._child_process_reaper_queue and wait() for
them
"""
procs = set()
done = False
while not done:
timeout = 60 if procs else None
try:
new_proc = self._child_process_reaper_queue.get(
timeout=timeout)
if new_proc is not None:
procs.add(new_proc)
else:
done = True
except queue.Empty:
pass
reaped_procs = set()
for proc in procs:
try:
# this will reap the process if it has exited, but
# otherwise will not wait
proc.wait(timeout=0)
reaped_procs.add(proc)
except subprocess.TimeoutExpired:
pass
procs -= reaped_procs
# Just exists for doc anchor point
def sync(self, node, job, suffixes, *args, **kwargs):
"""
@ -169,7 +198,7 @@ class ObjectReplicator(Daemon):
:returns: return code of rsync process. 0 is successful
"""
start_time = time.time()
proc = ret_val = None
proc = None
try:
with Timeout(self.rsync_timeout):
@ -178,25 +207,28 @@ class ObjectReplicator(Daemon):
stderr=subprocess.STDOUT)
results = proc.stdout.read()
ret_val = proc.wait()
except GreenletExit:
self.logger.error(_("Killing by lockup detector"))
if proc:
# Assume rsync is still responsive and give it a chance
# to shut down gracefully
proc.terminate()
# Final good-faith effort to clean up the process table.
# Note that this blocks, but worst-case we wait for the
# lockup detector to come around and kill us. This can
# happen if the process is stuck down in kernel-space
# waiting on I/O or something.
proc.wait()
raise
except Timeout:
self.logger.error(_("Killing long-running rsync: %s"), str(args))
if proc:
proc.kill()
proc.wait()
try:
# Note: Python 2.7's subprocess.Popen class doesn't take
# any arguments for wait(), but Python 3's does.
# However, Eventlet's replacement Popen takes a timeout
# argument regardless of Python version, so we don't
# need any conditional code here.
proc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
# Sometimes a process won't die immediately even after a
# SIGKILL. This can be due to failing disks, high load,
# or other reasons. We can't wait for it forever since
# we're taking up a slot in the (green)thread pool, so
# we send it over to another greenthread, not part of
# our pool, whose sole duty is to wait for child
# processes to exit.
self._child_process_reaper_queue.put(proc)
return 1 # failure response code
total_time = time.time() - start_time
for result in results.split('\n'):
if result == '':
@ -554,14 +586,6 @@ class ObjectReplicator(Daemon):
_("Nothing replicated for %s seconds."),
(time.time() - self.start))
def kill_coros(self):
"""Utility function that kills all coroutines currently running."""
for coro in list(self.run_pool.coroutines_running):
try:
coro.kill(GreenletExit)
except GreenletExit:
pass
def heartbeat(self):
"""
Loop that runs in the background during replication. It periodically
@ -571,19 +595,6 @@ class ObjectReplicator(Daemon):
eventlet.sleep(self.stats_interval)
self.stats_line()
def detect_lockups(self):
"""
In testing, the pool.waitall() call very occasionally failed to return.
This is an attempt to make sure the replicator finishes its replication
pass in some eventuality.
"""
while True:
eventlet.sleep(self.lockup_timeout)
if self.replication_count == self.last_replication_count:
self.logger.error(_("Lockup detected.. killing live coros."))
self.kill_coros()
self.last_replication_count = self.replication_count
def build_replication_jobs(self, policy, ips, override_devices=None,
override_partitions=None):
"""
@ -734,7 +745,6 @@ class ObjectReplicator(Daemon):
self.handoffs_remaining = 0
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle
current_nodes = None
@ -783,8 +793,7 @@ class ObjectReplicator(Daemon):
else:
self.run_pool.spawn(self.update, job)
current_nodes = None
with Timeout(self.lockup_timeout):
self.run_pool.waitall()
self.run_pool.waitall()
except (Exception, Timeout):
if current_nodes:
self._add_failure_stats([(failure_dev['replication_ip'],
@ -793,14 +802,14 @@ class ObjectReplicator(Daemon):
else:
self._add_failure_stats(self.all_devs_info)
self.logger.exception(_("Exception in top-level replication loop"))
self.kill_coros()
finally:
stats.kill()
lockup_detector.kill()
self.stats_line()
self.stats['attempted'] = self.replication_count
def run_once(self, *args, **kwargs):
rsync_reaper = eventlet.spawn(self._child_process_reaper)
self._zero_stats()
self.logger.info(_("Running object replicator in script mode."))
@ -830,8 +839,14 @@ class ObjectReplicator(Daemon):
'object_replication_last': replication_last},
self.rcache, self.logger)
# Give rsync processes one last chance to exit, then bail out and
# let them be init's problem
self._child_process_reaper_queue.put(None)
rsync_reaper.wait()
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object replicator in daemon mode."))
eventlet.spawn_n(self._child_process_reaper)
# Run the replicator continually
while True:
self._zero_stats()

View File

@ -30,8 +30,7 @@ from eventlet.green import subprocess
from eventlet import Timeout, sleep
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
mocked_http_conn, mock_check_drive, skip_if_no_xattrs,
SkipTest)
mocked_http_conn, mock_check_drive, skip_if_no_xattrs)
from swift.common import utils
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
storage_directory)
@ -134,20 +133,28 @@ def _mock_process(ret):
class MockHungProcess(object):
def __init__(self, *args, **kwargs):
def __init__(self, waits_needed=1, *args, **kwargs):
class MockStdout(object):
def read(self):
pass
self.stdout = MockStdout()
self._state = 'running'
self._calls = []
self._waits = 0
self._waits_needed = waits_needed
def wait(self):
def wait(self, timeout=None):
self._calls.append(('wait', self._state))
if self._state == 'running':
# Sleep so we trip either the lockup detector or the rsync timeout
# Sleep so we trip the rsync timeout
sleep(1)
raise BaseException('You need to mock out some timeouts')
elif self._state == 'killed':
self._waits += 1
if self._waits >= self._waits_needed:
return
else:
raise subprocess.TimeoutExpired('some cmd', timeout)
def terminate(self):
self._calls.append(('terminate', self._state))
@ -2036,38 +2043,6 @@ class TestObjectReplicator(unittest.TestCase):
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
def test_replicate_lockup_detector(self):
raise SkipTest("this is not a reliable test and must be fixed")
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES[0])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
mock_procs = []
def new_mock(*a, **kw):
proc = MockHungProcess()
mock_procs.append(proc)
return proc
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'lockup_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.replicate()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
('terminate', 'running'),
('wait', 'terminating'),
])
self.assertEqual(len(mock_procs), 1)
def test_replicate_rsync_timeout(self):
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
@ -2090,7 +2065,7 @@ class TestObjectReplicator(unittest.TestCase):
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.replicate()
self.replicator.run_once()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
@ -2099,5 +2074,38 @@ class TestObjectReplicator(unittest.TestCase):
])
self.assertEqual(len(mock_procs), 2)
def test_replicate_rsync_timeout_wedged(self):
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES[0])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
mock_procs = []
def new_mock(*a, **kw):
proc = MockHungProcess(waits_needed=2)
mock_procs.append(proc)
return proc
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.run_once()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
('kill', 'running'),
('wait', 'killed'),
('wait', 'killed'),
])
self.assertEqual(len(mock_procs), 2)
if __name__ == '__main__':
unittest.main()