Creates multiple worker processes for API server

This change to the WSGI code uses openstack.common.service to create
multiple worker processes to handle API load.  The main process will
start up a configurable (workers=??) number of child processes which
will all listen on the bind port.  The main process becomes the parent
and manages the children.  The parent is not a worker.

Backwards compatibility is preserved by setting api_workers to 0, the
default.  In this case, no separate worker processes are spawned and
the worker threads run in the main process.

Implement blueprint multi-workers-for-api-server

Change-Id: Iffa76041d0055840ccca852814b0e71f17a950ac
This commit is contained in:
Carl Baldwin 2013-07-12 20:45:38 +00:00
parent 3ada4ec271
commit 2b30920e82
4 changed files with 72 additions and 5 deletions

View File

@ -239,6 +239,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
# =========== end of items for agent scheduler extension ===== # =========== end of items for agent scheduler extension =====
# =========== WSGI parameters related to the API server ============== # =========== WSGI parameters related to the API server ==============
# Number of separate worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as workers. The parent process manages them.
# api_workers = 0
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
# starting API server. Not supported on OS X. # starting API server. Not supported on OS X.
# tcp_keepidle = 600 # tcp_keepidle = 600

View File

@ -36,6 +36,9 @@ service_opts = [
cfg.IntOpt('periodic_interval', cfg.IntOpt('periodic_interval',
default=40, default=40,
help=_('Seconds between running periodic tasks')), help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay', cfg.IntOpt('periodic_fuzzy_delay',
default=5, default=5,
help=_('Range of seconds to randomly delay when starting the ' help=_('Range of seconds to randomly delay when starting the '
@ -111,7 +114,8 @@ def _run_wsgi(app_name):
LOG.error(_('No known API applications configured.')) LOG.error(_('No known API applications configured.'))
return return
server = wsgi.Server("Neutron") server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host) server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed # Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"), LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"),

View File

@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase):
server.stop() server.stop()
server.wait() server.wait()
@mock.patch('neutron.wsgi.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
server.stop()
self.assertFalse(launcher.running)
server.wait()
launcher.wait.assert_called_once_with()
def test_start_random_port_with_ipv6(self): def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port") server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1") server.start(None, 0, host="::1")

View File

@ -37,9 +37,11 @@ import webob.exc
from neutron.common import constants from neutron.common import constants
from neutron.common import exceptions as exception from neutron.common import exceptions as exception
from neutron import context from neutron import context
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import gettextutils from neutron.openstack.common import gettextutils
from neutron.openstack.common import jsonutils from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common.service import ProcessLauncher
socket_opts = [ socket_opts = [
cfg.IntOpt('backlog', cfg.IntOpt('backlog',
@ -82,12 +84,39 @@ def run_server(application, port):
eventlet.wsgi.server(sock, application) eventlet.wsgi.server(sock, application)
class WorkerService(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
self._application = application
self._server = None
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producting 500 errors later when they
# are discovered to be broken.
session.get_engine(sqlite_fk=True).pool.dispose()
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
def wait(self):
self._service.pool.waitall()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
class Server(object): class Server(object):
"""Server class to manage multiple WSGI sockets and applications.""" """Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000): def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads) self.pool = eventlet.GreenPool(threads)
self.name = name self.name = name
self._launcher = None
self._server = None
def _get_socket(self, host, port, backlog): def _get_socket(self, host, port, backlog):
bind_addr = (host, port) bind_addr = (host, port)
@ -166,7 +195,7 @@ class Server(object):
return sock return sock
def start(self, application, port, host='0.0.0.0'): def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application.""" """Run a WSGI server with the given application."""
self._host = host self._host = host
self._port = port self._port = port
@ -175,7 +204,14 @@ class Server(object):
self._socket = self._get_socket(self._host, self._socket = self._get_socket(self._host,
self._port, self._port,
backlog=backlog) backlog=backlog)
self._server = self.pool.spawn(self._run, application, self._socket) if workers < 1:
# For the case where only one process is required.
self._server = self.pool.spawn(self._run, application,
self._socket)
else:
self._launcher = ProcessLauncher()
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
@property @property
def host(self): def host(self):
@ -186,11 +222,18 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port return self._socket.getsockname()[1] if self._socket else self._port
def stop(self): def stop(self):
if self._launcher:
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill() self._server.kill()
def wait(self): def wait(self):
"""Wait until all servers have completed running.""" """Wait until all servers have completed running."""
try: try:
if self._launcher:
self._launcher.wait()
else:
self.pool.waitall() self.pool.waitall()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass