diff --git a/swift/common/manager.py b/swift/common/manager.py index 089895044e..8e78ff4428 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -43,6 +43,7 @@ KILL_WAIT = 15 # seconds to wait for servers to die MAX_DESCRIPTORS = 32768 MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB + def setup_env(): """Try to increase resource limits of the OS. Move PYTHON_EGG_CACHE to /tmp """ @@ -58,6 +59,7 @@ def setup_env(): os.environ['PYTHON_EGG_CACHE'] = '/tmp' return + def command(func): """ Decorator to declare which methods are accessible as commands, commands @@ -74,6 +76,44 @@ def command(func): return wrapped +def watch_server_pids(server_pids, interval=1, **kwargs): + """Monitor a collection of server pids yeilding back those pids that + aren't responding to signals. + + :param server_pids: a dict, lists of pids [int,...] keyed on + Server objects + """ + status = {} + start = time.time() + end = start + interval + server_pids = dict(server_pids) # make a copy + while interval: + for server, pids in server_pids.items(): + for pid in pids: + try: + # let pid stop if it wants to + os.waitpid(pid, os.WNOHANG) + except OSError, e: + if e.errno not in (errno.ECHILD, errno.ESRCH): + raise # else no such child/process + # check running pids for server + status[server] = server.get_running_pids(**kwargs) + for pid in pids: + # original pids no longer in running pids! + if pid not in status[server]: + yield server, pid + # update active pids list using running_pids + server_pids[server] = status[server] + if not [p for server, pids in status.items() for p in pids]: + # no more running pids + break + if time.time() > end: + break + else: + time.sleep(0.1) + return + + class UnknownCommandError(Exception): pass @@ -105,42 +145,6 @@ class Manager(): for name in server_names: self.servers.add(Server(name)) - def watch_server_pids(self, server_pids, interval=0, **kwargs): - """Monitor a collection of server pids yeilding back those pids that - aren't responding to signals. - - :param server_pids: a dict, lists of pids [int,...] keyed on - Server objects - """ - status = {} - start = time.time() - end = start + interval - while interval: - for server, pids in server_pids.items(): - for pid in pids: - try: - # let pid stop if it wants to - os.waitpid(pid, os.WNOHANG) - except OSError, e: - if e.errno not in (errno.ECHILD, errno.ESRCH): - raise # else no such child/process - # check running pids for server - status[server] = server.get_running_pids(**kwargs) - for pid in pids: - # original pids no longer in running pids! - if pid not in status[server]: - yield server, pid - # update active pids list using running_pids - server_pids[server] = status[server] - if not [p for server, pids in status.items() for p in pids]: - # no more running pids - break - if time.time() > end: - break - else: - time.sleep(0.1) - return - @command def status(self, **kwargs): """display status of tracked pids for server @@ -206,11 +210,11 @@ class Manager(): server_pids[server] = signaled_pids # all signaled_pids, i.e. list(itertools.chain(*server_pids.values())) - signaled_pids = [p for server, pid in server_pids.items() for p in pid] + signaled_pids = [p for server, pids in server_pids.items() for p in pids] # keep track of the pids yeiled back as killed for all servers killed_pids = set() - for server, killed_pid in self.watch_server_pids(server_pids, - interval=KILL_WAIT, **kwargs): + for server, killed_pid in watch_server_pids(server_pids, + interval=KILL_WAIT, **kwargs): print "%s (%s) appears to have stopped" % (server, killed_pid) killed_pids.add(killed_pid) if not killed_pids.symmetric_difference(signaled_pids): @@ -230,7 +234,7 @@ class Manager(): """ kwargs['graceful'] = True status = 0 - self.stop(**kwargs) + status += self.stop(**kwargs) return status @command @@ -238,8 +242,8 @@ class Manager(): """stops then restarts server """ status = 0 - self.stop(**kwargs) - self.start(**kwargs) + status += self.stop(**kwargs) + status += self.start(**kwargs) return status @command diff --git a/test/unit/common/test_manager.py b/test/unit/common/test_manager.py index c48829d2c7..e385082cb8 100644 --- a/test/unit/common/test_manager.py +++ b/test/unit/common/test_manager.py @@ -31,6 +31,7 @@ from swift.common import manager DUMMY_SIG = 1 + class MockOs(): def __init__(self, pids): @@ -79,7 +80,7 @@ class TestManagerModule(unittest.TestCase): def __init__(self, error=None): self.error = error self.called_with_args = [] - + def setrlimit(self, resource, limits): if self.error: raise self.error @@ -121,8 +122,6 @@ class TestManagerModule(unittest.TestCase): finally: manager.resource = _orig_resource os.environ = _orig_environ - - def test_command_wrapper(self): @manager.command @@ -139,9 +138,122 @@ class TestManagerModule(unittest.TestCase): self.assert_(hasattr(myfunc, 'publicly_accessible')) self.assert_(myfunc.publicly_accessible) + def test_watch_server_pids(self): + class MockOs(): + WNOHANG = os.WNOHANG + + def __init__(self, pid_map={}): + self.pid_map = {} + for pid, v in pid_map.items(): + self.pid_map[pid] = (x for x in v) + + def waitpid(self, pid, options): + try: + rv = self.pid_map[pid].next() + except StopIteration: + raise OSError(errno.ECHILD, os.strerror(errno.ECHILD)) + except KeyError: + raise OSError(errno.ESRCH, os.strerror(errno.ESRCH)) + if isinstance(rv, Exception): + raise rv + else: + return rv + + class MockTime(): + def __init__(self, ticks=None): + self.tock = time() + if not ticks: + ticks = [] + + self.ticks = (t for t in ticks) + + def time(self): + try: + self.tock += self.ticks.next() + except StopIteration: + self.tock += 1 + return self.tock + + def sleep(*args): + return + + class MockServer(): + + def __init__(self, pids, zombie=0): + self.heartbeat = (pids for _ in range(zombie)) + + def get_running_pids(self): + try: + rv = self.heartbeat.next() + return rv + except StopIteration: + return {} + + _orig_os = manager.os + _orig_time = manager.time + _orig_server = manager.Server + try: + manager.time = MockTime() + manager.os = MockOs() + # this server always says it's dead when you ask for running pids + server = MockServer([1]) + # list of pids keyed on servers to watch + server_pids = { + server: [1], + } + # basic test, server dies + gen = manager.watch_server_pids(server_pids) + expected = [(server, 1)] + self.assertEquals([x for x in gen], [(server, 1)]) + # start long running server and short interval + server = MockServer([1], zombie=15) + server_pids = { + server: [1], + } + gen = manager.watch_server_pids(server_pids) + self.assertEquals([x for x in gen], []) + # wait a little longer + gen = manager.watch_server_pids(server_pids, interval=15) + self.assertEquals([x for x in gen], [(server, 1)]) + # zombie process + server = MockServer([1], zombie=200) + server_pids = { + server: [1], + } + # test weird os error + manager.os = MockOs({1: [OSError()]}) + gen = manager.watch_server_pids(server_pids) + self.assertRaises(OSError, lambda: [x for x in gen]) + # test multi-server + server1 = MockServer([1, 10], zombie=200) + server2 = MockServer([2, 20], zombie=8) + server_pids = { + server1: [1, 10], + server2: [2, 20], + } + pid_map = { + 1: [None for _ in range(10)], + 2: [None for _ in range(8)], + 20: [None for _ in range(4)], + } + manager.os = MockOs(pid_map) + gen = manager.watch_server_pids(server_pids, + interval=manager.KILL_WAIT) + expected = [ + (server2, 2), + (server2, 20), + ] + self.assertEquals([x for x in gen], expected) + + finally: + manager.os = _orig_os + manager.time = _orig_time + manager.Server = _orig_server + def test_exc(self): self.assert_(issubclass(manager.UnknownCommandError, Exception)) + class TestServer(unittest.TestCase): def tearDown(self): @@ -876,7 +988,7 @@ class TestServer(unittest.TestCase): procs.append(MockProcess(fail=fail)) server.procs = procs self.assert_(server.interact() > 0) - + def test_launch(self): # stubs ini_files = ( @@ -987,8 +1099,6 @@ class TestServer(unittest.TestCase): 'number': 4 } self.assertEquals(mock_spawn.kwargs, [expected]) - - finally: sys.stdout = old_stdout @@ -1047,6 +1157,7 @@ class TestServer(unittest.TestCase): self.assertFalse(os.path.exists(conf4)) self.assertFalse(os.path.exists(conf3)) + class TestManager(unittest.TestCase): def test_create(self): @@ -1100,93 +1211,13 @@ class TestManager(unittest.TestCase): for s in m.servers: self.assert_(str(s) in replicators) - - def test_watch_server_pids(self): - class MockOs(): - WNOHANG = os.WNOHANG - def __init__(self, pid_map={}): - self.pid_map = {} - for pid,v in pid_map.items(): - self.pid_map[pid] = (x for x in v) - def waitpid(self, pid, options): - try: - rv = self.pid_map[pid].next() - except StopIteration: - raise OSError(errno.ECHILD, os.strerror(errno.ECHILD)) - except KeyError: - raise OSError(errno.ESRCH, os.strerror(errno.ESRCH)) - if isinstance(rv, Exception): - raise rv() - else: - return rv - - class MockTime(): - def __init__(self, ticks=None): - self.tock = time() - if ticks: - ticks = [t for t in ticks] - else: - ticks = [] - - self.ticks = (t for t in ticks) - - def time(self): - try: - self.tock += self.ticks.next() - except StopIteration: - self.tock += 1 - return self.tock - - class MockServer(): - def __init__(self, server): - self.server = server - - def get_running_pids(self): - return {} - - _orig_os = manager.os - _orig_time = manager.time - _orig_server= manager.Server - try: - manager.os = MockOs() - manager.time = MockTime() - manager.Server = MockServer - m = manager.Manager(['test']) - self.assertEquals(len(m.servers), 1) - server = m.servers.pop() - server_pids = { - server: [1], - } - # test default interval - gen = m.watch_server_pids(server_pids) - self.assertEquals([x for x in gen], []) - # test small interval - gen = m.watch_server_pids(server_pids, interval=1) - # TODO: More tests! - #self.assertEquals([x for x in gen], []) - finally: - manager.os = _orig_os - manager.time = _orig_time - manager.Server = _orig_server - - def test_get_command(self): - raise SkipTest - - def test_list_commands(self): - for cmd, help in manager.Manager.list_commands(): - method = getattr(manager.Manager, cmd.replace('-', '_'), None) - self.assert_(method, '%s is not a command' % cmd) - self.assert_(getattr(method, 'publicly_accessible', False)) - self.assertEquals(method.__doc__.strip(), help) - - def test_run_command(self): - raise SkipTest - def test_status(self): class MockServer(): + def __init__(self, server): self.server = server self.called_kwargs = [] + def status(self, **kwargs): self.called_kwargs.append(kwargs) if 'error' in self.server: @@ -1216,8 +1247,8 @@ class TestManager(unittest.TestCase): def test_start(self): def mock_setup_env(): getattr(mock_setup_env, 'called', []).append(True) - class MockServer(): + class MockServer(): def __init__(self, server): self.server = server self.called = defaultdict(list) @@ -1231,16 +1262,15 @@ class TestManager(unittest.TestCase): return 1 else: return 0 - + + def stop(self, **kwargs): + self.called['stop'].append(kwargs) + def interact(self, **kwargs): self.called['interact'].append(kwargs) - # TODO: test user quit - """ if 'raise' in self.server: raise KeyboardInterrupt - el - """ - if 'error' in self.server: + elif 'error' in self.server: return 1 else: return 0 @@ -1266,7 +1296,7 @@ class TestManager(unittest.TestCase): for server in m.servers: self.assertEquals(server.called['launch'], [kwargs]) self.assertEquals(server.called['wait'], [kwargs]) - + # test interact m = manager.Manager(['proxy', 'error']) kwargs = {'daemon': False} @@ -1275,11 +1305,14 @@ class TestManager(unittest.TestCase): for server in m.servers: self.assertEquals(server.called['launch'], [kwargs]) self.assertEquals(server.called['interact'], [kwargs]) + m = manager.Manager(['raise']) + kwargs = {'daemon': False} + status = m.start(**kwargs) + finally: manager.setup_env = old_setup_env manager.Server = old_swift_server - def test_wait(self): class MockServer(): def __init__(self, server): @@ -1341,7 +1374,6 @@ class TestManager(unittest.TestCase): self.assert_(called_kwargs['once']) finally: manager.Server = orig_swift_server - def test_no_daemon(self): class MockServer(): @@ -1387,7 +1419,6 @@ class TestManager(unittest.TestCase): def launch(self, **kwargs): return self.called['launch'].append(kwargs) - orig_swift_server = manager.Server try: @@ -1408,25 +1439,89 @@ class TestManager(unittest.TestCase): self.assertEquals(len(server.called['interact']), 0) finally: manager.Server = orig_swift_server - def test_stop(self): - raise SkipTest + class MockServerFactory(): + class MockServer(): + def __init__(self, pids): + self.pids = pids + def stop(self, **kwargs): + return self.pids + + def __init__(self, server_pids): + self.server_pids = server_pids + + def __call__(self, server): + return MockServerFactory.MockServer(self.server_pids[server]) + + + def mock_watch_server_pids(server_pids, **kwargs): + for server, pids in server_pids.items(): + for pid in pids: + if pid is None: + continue + yield server, pid + + _orig_server = manager.Server + _orig_watch_server_pids = manager.watch_server_pids + try: + manager.watch_server_pids = mock_watch_server_pids + # test stop one server + server_pids = { + 'test': [1] + } + manager.Server = MockServerFactory(server_pids) + m = manager.Manager(['test']) + status = m.stop() + self.assertEquals(status, 0) + # test not running + server_pids = { + 'test': [] + } + manager.Server = MockServerFactory(server_pids) + m = manager.Manager(['test']) + status = m.stop() + self.assertEquals(status, 1) + # test won't die + server_pids = { + 'test': [None] + } + manager.Server = MockServerFactory(server_pids) + m = manager.Manager(['test']) + status = m.stop() + self.assertEquals(status, 1) + + finally: + manager.Server = _orig_server + manager.watch_server_pids = _orig_watch_server_pids + + # TODO: more tests def test_shutdown(self): - raise SkipTest + pass def test_restart(self): - raise SkipTest + pass def test_reload(self): - raise SkipTest + pass def test_force_reload(self): - raise SkipTest + pass + def test_get_command(self): + pass + + def test_list_commands(self): + for cmd, help in manager.Manager.list_commands(): + method = getattr(manager.Manager, cmd.replace('-', '_'), None) + self.assert_(method, '%s is not a command' % cmd) + self.assert_(getattr(method, 'publicly_accessible', False)) + self.assertEquals(method.__doc__.strip(), help) + + def test_run_command(self): + pass if __name__ == '__main__': unittest.main() -