diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index f60ad502fd..01354f987a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -25,9 +25,8 @@ import six.moves.cPickle as pickle from swift import gettext_ as _ import eventlet -from eventlet import GreenPool, tpool, Timeout, sleep +from eventlet import GreenPool, queue, tpool, Timeout, sleep from eventlet.green import subprocess -from eventlet.support.greenlets import GreenletExit from swift.common.constraints import check_drive from swift.common.ring.utils import is_local_device @@ -90,7 +89,6 @@ class ObjectReplicator(Daemon): if not self.rsync_module: self.rsync_module = '{replication_ip}::object' self.http_timeout = int(conf.get('http_timeout', 60)) - self.lockup_timeout = int(conf.get('lockup_timeout', 1800)) self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, "object.recon") @@ -113,6 +111,7 @@ class ObjectReplicator(Daemon): 'handoff_delete before the next ' 'normal rebalance') self._df_router = DiskFileRouter(conf, self.logger) + self._child_process_reaper_queue = queue.LightQueue() def _zero_stats(self): """Zero out the stats.""" @@ -138,6 +137,36 @@ class ObjectReplicator(Daemon): my_replication_ips.add(local_dev['replication_ip']) return list(my_replication_ips) + def _child_process_reaper(self): + """ + Consume processes from self._child_process_reaper_queue and wait() for + them + """ + procs = set() + done = False + while not done: + timeout = 60 if procs else None + try: + new_proc = self._child_process_reaper_queue.get( + timeout=timeout) + if new_proc is not None: + procs.add(new_proc) + else: + done = True + except queue.Empty: + pass + + reaped_procs = set() + for proc in procs: + try: + # this will reap the process if it has exited, but + # otherwise will not wait + proc.wait(timeout=0) + reaped_procs.add(proc) + except subprocess.TimeoutExpired: + pass + procs -= reaped_procs + # Just exists for doc anchor point def sync(self, node, job, suffixes, *args, **kwargs): """ @@ -169,7 +198,7 @@ class ObjectReplicator(Daemon): :returns: return code of rsync process. 0 is successful """ start_time = time.time() - proc = ret_val = None + proc = None try: with Timeout(self.rsync_timeout): @@ -178,25 +207,28 @@ class ObjectReplicator(Daemon): stderr=subprocess.STDOUT) results = proc.stdout.read() ret_val = proc.wait() - except GreenletExit: - self.logger.error(_("Killing by lockup detector")) - if proc: - # Assume rsync is still responsive and give it a chance - # to shut down gracefully - proc.terminate() - # Final good-faith effort to clean up the process table. - # Note that this blocks, but worst-case we wait for the - # lockup detector to come around and kill us. This can - # happen if the process is stuck down in kernel-space - # waiting on I/O or something. - proc.wait() - raise except Timeout: self.logger.error(_("Killing long-running rsync: %s"), str(args)) if proc: proc.kill() - proc.wait() + try: + # Note: Python 2.7's subprocess.Popen class doesn't take + # any arguments for wait(), but Python 3's does. + # However, Eventlet's replacement Popen takes a timeout + # argument regardless of Python version, so we don't + # need any conditional code here. + proc.wait(timeout=1.0) + except subprocess.TimeoutExpired: + # Sometimes a process won't die immediately even after a + # SIGKILL. This can be due to failing disks, high load, + # or other reasons. We can't wait for it forever since + # we're taking up a slot in the (green)thread pool, so + # we send it over to another greenthread, not part of + # our pool, whose sole duty is to wait for child + # processes to exit. + self._child_process_reaper_queue.put(proc) return 1 # failure response code + total_time = time.time() - start_time for result in results.split('\n'): if result == '': @@ -554,14 +586,6 @@ class ObjectReplicator(Daemon): _("Nothing replicated for %s seconds."), (time.time() - self.start)) - def kill_coros(self): - """Utility function that kills all coroutines currently running.""" - for coro in list(self.run_pool.coroutines_running): - try: - coro.kill(GreenletExit) - except GreenletExit: - pass - def heartbeat(self): """ Loop that runs in the background during replication. It periodically @@ -571,19 +595,6 @@ class ObjectReplicator(Daemon): eventlet.sleep(self.stats_interval) self.stats_line() - def detect_lockups(self): - """ - In testing, the pool.waitall() call very occasionally failed to return. - This is an attempt to make sure the replicator finishes its replication - pass in some eventuality. - """ - while True: - eventlet.sleep(self.lockup_timeout) - if self.replication_count == self.last_replication_count: - self.logger.error(_("Lockup detected.. killing live coros.")) - self.kill_coros() - self.last_replication_count = self.replication_count - def build_replication_jobs(self, policy, ips, override_devices=None, override_partitions=None): """ @@ -734,7 +745,6 @@ class ObjectReplicator(Daemon): self.handoffs_remaining = 0 stats = eventlet.spawn(self.heartbeat) - lockup_detector = eventlet.spawn(self.detect_lockups) eventlet.sleep() # Give spawns a cycle current_nodes = None @@ -783,8 +793,7 @@ class ObjectReplicator(Daemon): else: self.run_pool.spawn(self.update, job) current_nodes = None - with Timeout(self.lockup_timeout): - self.run_pool.waitall() + self.run_pool.waitall() except (Exception, Timeout): if current_nodes: self._add_failure_stats([(failure_dev['replication_ip'], @@ -793,14 +802,14 @@ class ObjectReplicator(Daemon): else: self._add_failure_stats(self.all_devs_info) self.logger.exception(_("Exception in top-level replication loop")) - self.kill_coros() finally: stats.kill() - lockup_detector.kill() self.stats_line() self.stats['attempted'] = self.replication_count def run_once(self, *args, **kwargs): + rsync_reaper = eventlet.spawn(self._child_process_reaper) + self._zero_stats() self.logger.info(_("Running object replicator in script mode.")) @@ -830,8 +839,14 @@ class ObjectReplicator(Daemon): 'object_replication_last': replication_last}, self.rcache, self.logger) + # Give rsync processes one last chance to exit, then bail out and + # let them be init's problem + self._child_process_reaper_queue.put(None) + rsync_reaper.wait() + def run_forever(self, *args, **kwargs): self.logger.info(_("Starting object replicator in daemon mode.")) + eventlet.spawn_n(self._child_process_reaper) # Run the replicator continually while True: self._zero_stats() diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 7c711e56b8..0d061b4ec9 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -30,8 +30,7 @@ from eventlet.green import subprocess from eventlet import Timeout, sleep from test.unit import (debug_logger, patch_policies, make_timestamp_iter, - mocked_http_conn, mock_check_drive, skip_if_no_xattrs, - SkipTest) + mocked_http_conn, mock_check_drive, skip_if_no_xattrs) from swift.common import utils from swift.common.utils import (hash_path, mkdirs, normalize_timestamp, storage_directory) @@ -134,20 +133,28 @@ def _mock_process(ret): class MockHungProcess(object): - def __init__(self, *args, **kwargs): + def __init__(self, waits_needed=1, *args, **kwargs): class MockStdout(object): def read(self): pass self.stdout = MockStdout() self._state = 'running' self._calls = [] + self._waits = 0 + self._waits_needed = waits_needed - def wait(self): + def wait(self, timeout=None): self._calls.append(('wait', self._state)) if self._state == 'running': - # Sleep so we trip either the lockup detector or the rsync timeout + # Sleep so we trip the rsync timeout sleep(1) raise BaseException('You need to mock out some timeouts') + elif self._state == 'killed': + self._waits += 1 + if self._waits >= self._waits_needed: + return + else: + raise subprocess.TimeoutExpired('some cmd', timeout) def terminate(self): self._calls.append(('terminate', self._state)) @@ -2036,38 +2043,6 @@ class TestObjectReplicator(unittest.TestCase): self.assertIn( "next_part_power set in policy 'one'. Skipping", warnings) - def test_replicate_lockup_detector(self): - raise SkipTest("this is not a reliable test and must be fixed") - cur_part = '0' - df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o', - policy=POLICIES[0]) - mkdirs(df._datadir) - f = open(os.path.join(df._datadir, - normalize_timestamp(time.time()) + '.data'), - 'wb') - f.write('1234567890') - f.close() - - mock_procs = [] - - def new_mock(*a, **kw): - proc = MockHungProcess() - mock_procs.append(proc) - return proc - - with mock.patch('swift.obj.replicator.http_connect', - mock_http_connect(200)), \ - mock.patch.object(self.replicator, 'lockup_timeout', 0.01), \ - mock.patch('eventlet.green.subprocess.Popen', new_mock): - self.replicator.replicate() - for proc in mock_procs: - self.assertEqual(proc._calls, [ - ('wait', 'running'), - ('terminate', 'running'), - ('wait', 'terminating'), - ]) - self.assertEqual(len(mock_procs), 1) - def test_replicate_rsync_timeout(self): cur_part = '0' df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o', @@ -2090,7 +2065,7 @@ class TestObjectReplicator(unittest.TestCase): mock_http_connect(200)), \ mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \ mock.patch('eventlet.green.subprocess.Popen', new_mock): - self.replicator.replicate() + self.replicator.run_once() for proc in mock_procs: self.assertEqual(proc._calls, [ ('wait', 'running'), @@ -2099,5 +2074,38 @@ class TestObjectReplicator(unittest.TestCase): ]) self.assertEqual(len(mock_procs), 2) + def test_replicate_rsync_timeout_wedged(self): + cur_part = '0' + df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o', + policy=POLICIES[0]) + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + + mock_procs = [] + + def new_mock(*a, **kw): + proc = MockHungProcess(waits_needed=2) + mock_procs.append(proc) + return proc + + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)), \ + mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \ + mock.patch('eventlet.green.subprocess.Popen', new_mock): + self.replicator.run_once() + for proc in mock_procs: + self.assertEqual(proc._calls, [ + ('wait', 'running'), + ('kill', 'running'), + ('wait', 'killed'), + ('wait', 'killed'), + ]) + self.assertEqual(len(mock_procs), 2) + + if __name__ == '__main__': unittest.main()