diff --git a/neutron/agent/metadata/agent.py b/neutron/agent/metadata/agent.py index 569a99171a..a678edab71 100644 --- a/neutron/agent/metadata/agent.py +++ b/neutron/agent/metadata/agent.py @@ -40,7 +40,6 @@ from neutron.openstack.common.cache import cache from neutron.openstack.common import excutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common import service from neutron import wsgi LOG = logging.getLogger(__name__) @@ -280,16 +279,8 @@ class UnixDomainWSGIServer(wsgi.Server): self._socket = eventlet.listen(file_socket, family=socket.AF_UNIX, backlog=backlog) - if workers < 1: - # For the case where only one process is required. - self._server = self.pool.spawn_n(self._run, application, - self._socket) - else: - # Minimize the cost of checking for child exit by extending the - # wait interval past the default of 0.01s. - self._launcher = service.ProcessLauncher(wait_interval=1.0) - self._server = WorkerService(self, application) - self._launcher.launch_service(self._server, workers=workers) + + self._launch(application, workers=workers) def _run(self, application, socket): """Start a WSGI service in a new green thread.""" diff --git a/neutron/service.py b/neutron/service.py index 820364f7c1..eb48687cb3 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -40,7 +40,7 @@ service_opts = [ help=_('Seconds between running periodic tasks')), cfg.IntOpt('api_workers', default=0, - help=_('Number of separate worker processes for service')), + help=_('Number of separate API worker processes for service')), cfg.IntOpt('rpc_workers', default=0, help=_('Number of RPC worker processes for service')), diff --git a/neutron/tests/unit/test_metadata_agent.py b/neutron/tests/unit/test_metadata_agent.py index dbb3e12306..dff72cf028 100644 --- a/neutron/tests/unit/test_metadata_agent.py +++ b/neutron/tests/unit/test_metadata_agent.py @@ -495,8 +495,8 @@ class TestUnixDomainWSGIServer(base.BaseTestCase): def test_start(self): mock_app = mock.Mock() - with mock.patch.object(self.server, 'pool') as pool: - self.server.start(mock_app, '/the/path', workers=0, backlog=128) + with mock.patch.object(self.server, '_launch') as launcher: + self.server.start(mock_app, '/the/path', workers=5, backlog=128) self.eventlet.assert_has_calls([ mock.call.listen( '/the/path', @@ -504,27 +504,7 @@ class TestUnixDomainWSGIServer(base.BaseTestCase): backlog=128 )] ) - pool.spawn_n.assert_called_once_with( - self.server._run, - mock_app, - self.eventlet.listen.return_value - ) - - @mock.patch('neutron.openstack.common.service.ProcessLauncher') - def test_start_multiple_workers(self, process_launcher): - launcher = process_launcher.return_value - - mock_app = mock.Mock() - self.server.start(mock_app, '/the/path', workers=2, backlog=128) - launcher.running = True - launcher.launch_service.assert_called_once_with(self.server._server, - workers=2) - - self.server.stop() - self.assertFalse(launcher.running) - - self.server.wait() - launcher.wait.assert_called_once_with() + launcher.assert_called_once_with(mock_app, workers=5) def test_run(self): with mock.patch.object(agent, 'logging') as logging: diff --git a/neutron/tests/unit/test_wsgi.py b/neutron/tests/unit/test_wsgi.py index d122913988..789b27ce62 100644 --- a/neutron/tests/unit/test_wsgi.py +++ b/neutron/tests/unit/test_wsgi.py @@ -56,12 +56,10 @@ class TestWSGIServer(base.BaseTestCase): server = wsgi.Server("test_multiple_processes") server.start(None, 0, host="127.0.0.1", workers=2) - launcher.running = True - launcher.launch_service.assert_called_once_with(server._server, - workers=2) + launcher.launch_service.assert_called_once_with(mock.ANY, workers=2) server.stop() - self.assertFalse(launcher.running) + launcher.stop.assert_called_once_with() server.wait() launcher.wait.assert_called_once_with() diff --git a/neutron/wsgi.py b/neutron/wsgi.py index a98e222637..137dcb96e5 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -97,7 +97,8 @@ class WorkerService(object): self._service._socket) def wait(self): - self._service.pool.waitall() + if isinstance(self._server, eventlet.greenthread.GreenThread): + self._server.wait() def stop(self): if isinstance(self._server, eventlet.greenthread.GreenThread): @@ -113,7 +114,6 @@ class Server(object): eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line self.pool = eventlet.GreenPool(threads) self.name = name - self._launcher = None self._server = None def _get_socket(self, host, port, backlog): @@ -205,17 +205,22 @@ class Server(object): self._socket = self._get_socket(self._host, self._port, backlog=backlog) + + self._launch(application, workers) + + def _launch(self, application, workers=0): + service = WorkerService(self, application) if workers < 1: - # For the case where only one process is required. - self._server = self.pool.spawn(self._run, application, - self._socket) + # The API service should run in the current process. + self._server = service + service.start() systemd.notify_once() else: + # The API service runs in a number of child processes. # Minimize the cost of checking for child exit by extending the # wait interval past the default of 0.01s. - self._launcher = common_service.ProcessLauncher(wait_interval=1.0) - self._server = WorkerService(self, application) - self._launcher.launch_service(self._server, workers=workers) + self._server = common_service.ProcessLauncher(wait_interval=1.0) + self._server.launch_service(service, workers=workers) @property def host(self): @@ -226,19 +231,12 @@ class Server(object): return self._socket.getsockname()[1] if self._socket else self._port def stop(self): - if self._launcher: - # The process launcher does not support stop or kill. - self._launcher.running = False - else: - self._server.kill() + self._server.stop() def wait(self): """Wait until all servers have completed running.""" try: - if self._launcher: - self._launcher.wait() - else: - self.pool.waitall() + self._server.wait() except KeyboardInterrupt: pass