Change metadata-agent to spawn multiple workers
There is currently only one metadata-agent per network node, which could be handling connections from hundreds or thousands of metadata-namespace-proxy processes. This change addes a new "metadata_workers = XX" to the ini file to support creating more workers to help improve performance. Change-Id: Ib9ebcfc543a83982dd93db79c7dc631283fd3bfa Partial-bug: #1274536
This commit is contained in:
parent
ba12cc88ce
commit
32ca9c4f5f
@ -26,3 +26,6 @@ admin_password = %SERVICE_PASSWORD%
|
|||||||
|
|
||||||
# Location of Metadata Proxy UNIX domain socket
|
# Location of Metadata Proxy UNIX domain socket
|
||||||
# metadata_proxy_socket = $state_path/metadata_proxy
|
# metadata_proxy_socket = $state_path/metadata_proxy
|
||||||
|
|
||||||
|
# Number of separate worker processes for metadata server
|
||||||
|
# metadata_workers = 0
|
||||||
|
@ -37,6 +37,7 @@ from neutron.common import utils
|
|||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import loopingcall
|
from neutron.openstack.common import loopingcall
|
||||||
|
from neutron.openstack.common import service
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -192,12 +193,34 @@ class UnixDomainHttpProtocol(eventlet.wsgi.HttpProtocol):
|
|||||||
server)
|
server)
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerService(wsgi.WorkerService):
|
||||||
|
def start(self):
|
||||||
|
self._server = self._service.pool.spawn(self._service._run,
|
||||||
|
self._application,
|
||||||
|
self._service._socket)
|
||||||
|
|
||||||
|
|
||||||
class UnixDomainWSGIServer(wsgi.Server):
|
class UnixDomainWSGIServer(wsgi.Server):
|
||||||
def start(self, application, file_socket, backlog=128):
|
def __init__(self, name):
|
||||||
sock = eventlet.listen(file_socket,
|
self._socket = None
|
||||||
|
self._launcher = None
|
||||||
|
self._server = None
|
||||||
|
super(UnixDomainWSGIServer, self).__init__(name)
|
||||||
|
|
||||||
|
def start(self, application, file_socket, workers, backlog=128):
|
||||||
|
self._socket = eventlet.listen(file_socket,
|
||||||
family=socket.AF_UNIX,
|
family=socket.AF_UNIX,
|
||||||
backlog=backlog)
|
backlog=backlog)
|
||||||
self.pool.spawn_n(self._run, application, sock)
|
if workers < 1:
|
||||||
|
# For the case where only one process is required.
|
||||||
|
self._server = self.pool.spawn_n(self._run, application,
|
||||||
|
self._socket)
|
||||||
|
else:
|
||||||
|
# Minimize the cost of checking for child exit by extending the
|
||||||
|
# wait interval past the default of 0.01s.
|
||||||
|
self._launcher = service.ProcessLauncher(wait_interval=1.0)
|
||||||
|
self._server = WorkerService(self, application)
|
||||||
|
self._launcher.launch_service(self._server, workers=workers)
|
||||||
|
|
||||||
def _run(self, application, socket):
|
def _run(self, application, socket):
|
||||||
"""Start a WSGI service in a new green thread."""
|
"""Start a WSGI service in a new green thread."""
|
||||||
@ -213,7 +236,11 @@ class UnixDomainMetadataProxy(object):
|
|||||||
OPTS = [
|
OPTS = [
|
||||||
cfg.StrOpt('metadata_proxy_socket',
|
cfg.StrOpt('metadata_proxy_socket',
|
||||||
default='$state_path/metadata_proxy',
|
default='$state_path/metadata_proxy',
|
||||||
help=_('Location for Metadata Proxy UNIX domain socket'))
|
help=_('Location for Metadata Proxy UNIX domain socket')),
|
||||||
|
cfg.IntOpt('metadata_workers',
|
||||||
|
default=0,
|
||||||
|
help=_('Number of separate worker processes for metadata '
|
||||||
|
'server'))
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
@ -271,7 +298,8 @@ class UnixDomainMetadataProxy(object):
|
|||||||
def run(self):
|
def run(self):
|
||||||
server = UnixDomainWSGIServer('neutron-metadata-agent')
|
server = UnixDomainWSGIServer('neutron-metadata-agent')
|
||||||
server.start(MetadataProxyHandler(self.conf),
|
server.start(MetadataProxyHandler(self.conf),
|
||||||
self.conf.metadata_proxy_socket)
|
self.conf.metadata_proxy_socket,
|
||||||
|
workers=self.conf.metadata_workers)
|
||||||
server.wait()
|
server.wait()
|
||||||
|
|
||||||
|
|
||||||
|
@ -275,7 +275,7 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
|
|||||||
def test_start(self):
|
def test_start(self):
|
||||||
mock_app = mock.Mock()
|
mock_app = mock.Mock()
|
||||||
with mock.patch.object(self.server, 'pool') as pool:
|
with mock.patch.object(self.server, 'pool') as pool:
|
||||||
self.server.start(mock_app, '/the/path')
|
self.server.start(mock_app, '/the/path', workers=0)
|
||||||
self.eventlet.assert_has_calls([
|
self.eventlet.assert_has_calls([
|
||||||
mock.call.listen(
|
mock.call.listen(
|
||||||
'/the/path',
|
'/the/path',
|
||||||
@ -289,6 +289,22 @@ class TestUnixDomainWSGIServer(base.BaseTestCase):
|
|||||||
self.eventlet.listen.return_value
|
self.eventlet.listen.return_value
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@mock.patch('neutron.openstack.common.service.ProcessLauncher')
|
||||||
|
def test_start_multiple_workers(self, process_launcher):
|
||||||
|
launcher = process_launcher.return_value
|
||||||
|
|
||||||
|
mock_app = mock.Mock()
|
||||||
|
self.server.start(mock_app, '/the/path', workers=2)
|
||||||
|
launcher.running = True
|
||||||
|
launcher.launch_service.assert_called_once_with(self.server._server,
|
||||||
|
workers=2)
|
||||||
|
|
||||||
|
self.server.stop()
|
||||||
|
self.assertFalse(launcher.running)
|
||||||
|
|
||||||
|
self.server.wait()
|
||||||
|
launcher.wait.assert_called_once_with()
|
||||||
|
|
||||||
def test_run(self):
|
def test_run(self):
|
||||||
with mock.patch.object(agent, 'logging') as logging:
|
with mock.patch.object(agent, 'logging') as logging:
|
||||||
self.server._run('app', 'sock')
|
self.server._run('app', 'sock')
|
||||||
@ -313,6 +329,7 @@ class TestUnixDomainMetadataProxy(base.BaseTestCase):
|
|||||||
self.looping_mock = looping_call_p.start()
|
self.looping_mock = looping_call_p.start()
|
||||||
self.addCleanup(mock.patch.stopall)
|
self.addCleanup(mock.patch.stopall)
|
||||||
self.cfg.CONF.metadata_proxy_socket = '/the/path'
|
self.cfg.CONF.metadata_proxy_socket = '/the/path'
|
||||||
|
self.cfg.CONF.metadata_workers = 0
|
||||||
|
|
||||||
def test_init_doesnot_exists(self):
|
def test_init_doesnot_exists(self):
|
||||||
with mock.patch('os.path.isdir') as isdir:
|
with mock.patch('os.path.isdir') as isdir:
|
||||||
@ -376,7 +393,7 @@ class TestUnixDomainMetadataProxy(base.BaseTestCase):
|
|||||||
server.assert_has_calls([
|
server.assert_has_calls([
|
||||||
mock.call('neutron-metadata-agent'),
|
mock.call('neutron-metadata-agent'),
|
||||||
mock.call().start(handler.return_value,
|
mock.call().start(handler.return_value,
|
||||||
'/the/path'),
|
'/the/path', workers=0),
|
||||||
mock.call().wait()]
|
mock.call().wait()]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user