Rename workers to api_workers and simplify code

Refactor a few ugly aspects of the multiple API worker patch to make
way for multiple rpc workers.  This came up as I was trying to add
multiple RPC workers using similar patterns and remembering that some
things were left in a rather awkward state.

Change-Id: I549db67af4af6a2df80e12cf233109dda5213c47
This commit is contained in:
Carl Baldwin 2014-02-11 00:58:42 +00:00
parent 893bc994c4
commit af135cec1d
5 changed files with 23 additions and 56 deletions

View File

@ -40,7 +40,6 @@ from neutron.openstack.common.cache import cache
from neutron.openstack.common import excutils from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall from neutron.openstack.common import loopingcall
from neutron.openstack.common import service
from neutron import wsgi from neutron import wsgi
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -280,16 +279,8 @@ class UnixDomainWSGIServer(wsgi.Server):
self._socket = eventlet.listen(file_socket, self._socket = eventlet.listen(file_socket,
family=socket.AF_UNIX, family=socket.AF_UNIX,
backlog=backlog) backlog=backlog)
if workers < 1:
# For the case where only one process is required. self._launch(application, workers=workers)
self._server = self.pool.spawn_n(self._run, application,
self._socket)
else:
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._launcher = service.ProcessLauncher(wait_interval=1.0)
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
def _run(self, application, socket): def _run(self, application, socket):
"""Start a WSGI service in a new green thread.""" """Start a WSGI service in a new green thread."""

View File

@ -40,7 +40,7 @@ service_opts = [
help=_('Seconds between running periodic tasks')), help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers', cfg.IntOpt('api_workers',
default=0, default=0,
help=_('Number of separate worker processes for service')), help=_('Number of separate API worker processes for service')),
cfg.IntOpt('rpc_workers', cfg.IntOpt('rpc_workers',
default=0, default=0,
help=_('Number of RPC worker processes for service')), help=_('Number of RPC worker processes for service')),

View File

@ -495,8 +495,8 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
def test_start(self): def test_start(self):
mock_app = mock.Mock() mock_app = mock.Mock()
with mock.patch.object(self.server, 'pool') as pool: with mock.patch.object(self.server, '_launch') as launcher:
self.server.start(mock_app, '/the/path', workers=0, backlog=128) self.server.start(mock_app, '/the/path', workers=5, backlog=128)
self.eventlet.assert_has_calls([ self.eventlet.assert_has_calls([
mock.call.listen( mock.call.listen(
'/the/path', '/the/path',
@ -504,27 +504,7 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
backlog=128 backlog=128
)] )]
) )
pool.spawn_n.assert_called_once_with( launcher.assert_called_once_with(mock_app, workers=5)
self.server._run,
mock_app,
self.eventlet.listen.return_value
)
@mock.patch('neutron.openstack.common.service.ProcessLauncher')
def test_start_multiple_workers(self, process_launcher):
launcher = process_launcher.return_value
mock_app = mock.Mock()
self.server.start(mock_app, '/the/path', workers=2, backlog=128)
launcher.running = True
launcher.launch_service.assert_called_once_with(self.server._server,
workers=2)
self.server.stop()
self.assertFalse(launcher.running)
self.server.wait()
launcher.wait.assert_called_once_with()
def test_run(self): def test_run(self):
with mock.patch.object(agent, 'logging') as logging: with mock.patch.object(agent, 'logging') as logging:

View File

@ -56,12 +56,10 @@ class TestWSGIServer(base.BaseTestCase):
server = wsgi.Server("test_multiple_processes") server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2) server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True launcher.launch_service.assert_called_once_with(mock.ANY, workers=2)
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
server.stop() server.stop()
self.assertFalse(launcher.running) launcher.stop.assert_called_once_with()
server.wait() server.wait()
launcher.wait.assert_called_once_with() launcher.wait.assert_called_once_with()

View File

@ -97,7 +97,8 @@ class WorkerService(object):
self._service._socket) self._service._socket)
def wait(self): def wait(self):
self._service.pool.waitall() if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.wait()
def stop(self): def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread): if isinstance(self._server, eventlet.greenthread.GreenThread):
@ -113,7 +114,6 @@ class Server(object):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.pool = eventlet.GreenPool(threads) self.pool = eventlet.GreenPool(threads)
self.name = name self.name = name
self._launcher = None
self._server = None self._server = None
def _get_socket(self, host, port, backlog): def _get_socket(self, host, port, backlog):
@ -205,17 +205,22 @@ class Server(object):
self._socket = self._get_socket(self._host, self._socket = self._get_socket(self._host,
self._port, self._port,
backlog=backlog) backlog=backlog)
self._launch(application, workers)
def _launch(self, application, workers=0):
service = WorkerService(self, application)
if workers < 1: if workers < 1:
# For the case where only one process is required. # The API service should run in the current process.
self._server = self.pool.spawn(self._run, application, self._server = service
self._socket) service.start()
systemd.notify_once() systemd.notify_once()
else: else:
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the # Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s. # wait interval past the default of 0.01s.
self._launcher = common_service.ProcessLauncher(wait_interval=1.0) self._server = common_service.ProcessLauncher(wait_interval=1.0)
self._server = WorkerService(self, application) self._server.launch_service(service, workers=workers)
self._launcher.launch_service(self._server, workers=workers)
@property @property
def host(self): def host(self):
@ -226,19 +231,12 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port return self._socket.getsockname()[1] if self._socket else self._port
def stop(self): def stop(self):
if self._launcher: self._server.stop()
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill()
def wait(self): def wait(self):
"""Wait until all servers have completed running.""" """Wait until all servers have completed running."""
try: try:
if self._launcher: self._server.wait()
self._launcher.wait()
else:
self.pool.waitall()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass