Adds multiple RPC worker processes to neutron server

blueprint multiple-rpc-workers

Co-Authored-By: Terry Wilson<twilson@redhat.com>
Change-Id: I51f2a52add6c11af905e6f1e6e45d31731ebbb5d
This commit is contained in:
Carl Baldwin 2014-02-11 05:55:29 +00:00
parent 473dd1042d
commit bff62b40a8
8 changed files with 112 additions and 4 deletions

View File

@ -257,6 +257,11 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
# child processes as workers. The parent process manages them.
# api_workers = 0
# Number of separate RPC worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as RPC workers. The parent process manages them.
# rpc_workers = 0
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
# starting API server. Not supported on OS X.
# tcp_keepidle = 600

View File

@ -324,3 +324,15 @@ class NeutronPluginBaseV2(object):
:param id: UUID representing the port to delete.
"""
pass
def start_rpc_listener(self):
"""Start the rpc listener.
Most plugins start an RPC listener implicitly on initialization. In
order to support multiple process RPC, the plugin needs to expose
control over when this is started.
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
raise NotImplementedError

View File

@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection):
ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
return self.connection.consume_in_thread()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""

View File

@ -122,13 +122,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
def start_rpc_listener(self):
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
self.conn.consume_in_thread()
return self.conn.consume_in_thread()
def _process_provider_segment(self, segment):
network_type = self._get_attribute(segment, provider.NETWORK_TYPE)

View File

@ -27,8 +27,11 @@ from neutron.common import config
from neutron import service
from neutron.openstack.common import gettextutils
from neutron.openstack.common import log as logging
gettextutils.install('neutron', lazy=True)
LOG = logging.getLogger(__name__)
def main():
eventlet.monkey_patch()
@ -40,8 +43,25 @@ def main():
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
try:
neutron_service = service.serve_wsgi(service.NeutronApiService)
neutron_service.wait()
pool = eventlet.GreenPool()
neutron_api = service.serve_wsgi(service.NeutronApiService)
api_thread = pool.spawn(neutron_api.wait)
try:
neutron_rpc = service.serve_rpc()
except NotImplementedError:
LOG.info(_("RPC was already started in parent process by plugin."))
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())
pool.waitall()
except KeyboardInterrupt:
pass
except RuntimeError as e:
sys.exit(_("ERROR: %s") % e)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import inspect
import logging as std_logging
import os
@ -23,11 +24,15 @@ from oslo.config import cfg
from neutron.common import config
from neutron.common import legacy
from neutron import context
from neutron import manager
from neutron import neutron_plugin_base_v2
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import service
from neutron.openstack.common.service import ProcessLauncher
from neutron import wsgi
@ -38,6 +43,9 @@ service_opts = [
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
cfg.IntOpt('rpc_workers',
default=0,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
@ -108,6 +116,61 @@ def serve_wsgi(cls):
return service
class RpcWorker(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._server = None
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are
# discovered to be broken.
session.get_engine(sqlite_fk=True).pool.dispose()
self._server = self._plugin.start_rpc_listener()
def wait(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.wait()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
# If 0 < rpc_workers then start_rpc_listener would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin overrides
# start_rpc_listener.
base = neutron_plugin_base_v2.NeutronPluginBaseV2
if plugin.__class__.start_rpc_listener == base.start_rpc_listener:
LOG.debug(_("Active plugin doesn't implement start_rpc_listener"))
if 0 < cfg.CONF.rpc_workers:
msg = _("'rpc_workers = %d' ignored because start_rpc_listener "
"is not implemented.")
LOG.error(msg, cfg.CONF.rpc_workers)
raise NotImplementedError
try:
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
rpc.start()
return rpc
else:
launcher = ProcessLauncher(wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_('Unrecoverable error: please check log '
'for details.'))
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:

View File

@ -40,6 +40,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
super(PortBindingTestCase, self).setUp(PLUGIN_NAME)
self.port_create_status = 'DOWN'
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.start_rpc_listener()
def _check_response(self, port, vif_type, has_port_filter, bound):
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)

View File

@ -51,6 +51,11 @@ class Ml2SecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def setUp(self):
super(TestMl2SecurityGroups, self).setUp()
plugin = manager.NeutronManager.get_plugin()
plugin.start_rpc_listener()
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):