Boden R cd1fef8a81 use common agent topics from neutron-lib
The neutron.common.topics module was rehomed into neutron-lib with
commit Ie88b84949cbd55a4e7ad06341aab77b286cdc485
This patch consumes it by using neutron-lib's version of the module in
prep for Ia4a4604c259ce862597de80c6deeb3d408bf0e95

Change-Id: I34fb241499445e2a462b10815a26b79bded0fb78
2018-02-28 13:10:15 -07:00

182 lines
7.9 KiB
Python

# Copyright 2013 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib.agent import topics
from neutron_lib import constants as const
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.common import rpc as n_rpc
from neutron.db import agents_db
from vmware_nsx._i18n import _
from vmware_nsx.common import config
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.dhcp_meta import combined
from vmware_nsx.dhcp_meta import lsnmanager
from vmware_nsx.dhcp_meta import migration
from vmware_nsx.dhcp_meta import nsx as nsx_svc
from vmware_nsx.dhcp_meta import rpc as nsx_rpc
from vmware_nsx.extensions import lsn
LOG = logging.getLogger(__name__)
class SynchronizedDhcpRpcCallback(dhcp_rpc.DhcpRpcCallback):
"""DHCP RPC callbakcs synchronized with VMware plugin mutex."""
@lockutils.synchronized('vmware', 'neutron-')
def create_dhcp_port(self, context, **kwargs):
return super(SynchronizedDhcpRpcCallback, self).create_dhcp_port(
context, **kwargs)
class DhcpMetadataAccess(object):
def setup_dhcpmeta_access(self):
"""Initialize support for DHCP and Metadata services."""
self._init_extensions()
if cfg.CONF.NSX.agent_mode == config.AgentModes.AGENT:
self._setup_rpc_dhcp_metadata()
mod = nsx_rpc
elif cfg.CONF.NSX.agent_mode == config.AgentModes.AGENTLESS:
self._setup_nsx_dhcp_metadata()
mod = nsx_svc
elif cfg.CONF.NSX.agent_mode == config.AgentModes.COMBINED:
notifier = self._setup_nsx_dhcp_metadata()
self._setup_rpc_dhcp_metadata(notifier=notifier)
mod = combined
else:
error = _("Invalid agent_mode: %s") % cfg.CONF.NSX.agent_mode
LOG.error(error)
raise nsx_exc.NsxPluginException(err_msg=error)
self.handle_network_dhcp_access_delegate = (
mod.handle_network_dhcp_access
)
self.handle_port_dhcp_access_delegate = (
mod.handle_port_dhcp_access
)
self.handle_port_metadata_access_delegate = (
mod.handle_port_metadata_access
)
self.handle_metadata_access_delegate = (
mod.handle_router_metadata_access
)
def _setup_rpc_dhcp_metadata(self, notifier=None):
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
self.endpoints = [SynchronizedDhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
self.conn.consume_in_threads()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.supported_extension_aliases.extend(
['agent', 'dhcp_agent_scheduler'])
def _setup_nsx_dhcp_metadata(self):
self._check_services_requirements()
nsx_svc.register_dhcp_opts(cfg)
nsx_svc.register_metadata_opts(cfg)
lsnmanager.register_lsn_opts(cfg)
lsn_manager = lsnmanager.PersistentLsnManager(self.safe_reference)
self.lsn_manager = lsn_manager
if cfg.CONF.NSX.agent_mode == config.AgentModes.AGENTLESS:
notifier = nsx_svc.DhcpAgentNotifyAPI(self.safe_reference,
lsn_manager)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = notifier
# In agentless mode, ports whose owner is DHCP need to
# be special cased; so add it to the list of special
# owners list
if const.DEVICE_OWNER_DHCP not in self.port_special_owners:
self.port_special_owners.append(const.DEVICE_OWNER_DHCP)
elif cfg.CONF.NSX.agent_mode == config.AgentModes.COMBINED:
# This becomes ineffective, as all new networks creations
# are handled by Logical Services Nodes in NSX
cfg.CONF.set_override('network_auto_schedule', False)
LOG.warning('network_auto_schedule has been disabled')
notifier = combined.DhcpAgentNotifyAPI(self.safe_reference,
lsn_manager)
self.supported_extension_aliases.append(lsn.EXT_ALIAS)
# Add the capability to migrate dhcp and metadata services over
self.migration_manager = (
migration.MigrationManager(
self.safe_reference, lsn_manager, notifier))
return notifier
def _init_extensions(self):
extensions = (lsn.EXT_ALIAS, 'agent', 'dhcp_agent_scheduler')
for ext in extensions:
if ext in self.supported_extension_aliases:
self.supported_extension_aliases.remove(ext)
def _check_services_requirements(self):
try:
error = None
nsx_svc.check_services_requirements(self.cluster)
except nsx_exc.InvalidVersion:
error = _("Unable to run Neutron with config option '%s', as NSX "
"does not support it") % cfg.CONF.NSX.agent_mode
except nsx_exc.ServiceClusterUnavailable:
error = _("Unmet dependency for config option "
"'%s'") % cfg.CONF.NSX.agent_mode
if error:
LOG.error(error)
raise nsx_exc.NsxPluginException(err_msg=error)
def get_lsn(self, context, network_id, fields=None):
report = self.migration_manager.report(context, network_id)
return {'network': network_id, 'report': report}
def create_lsn(self, context, lsn):
network_id = lsn['lsn']['network']
subnet = self.migration_manager.validate(context, network_id)
subnet_id = None if not subnet else subnet['id']
self.migration_manager.migrate(context, network_id, subnet)
r = self.migration_manager.report(context, network_id, subnet_id)
return {'network': network_id, 'report': r}
def handle_network_dhcp_access(self, context, network, action):
self.handle_network_dhcp_access_delegate(self.safe_reference, context,
network, action)
def handle_port_dhcp_access(self, context, port_data, action):
self.handle_port_dhcp_access_delegate(self.safe_reference, context,
port_data, action)
def handle_port_metadata_access(self, context, port, is_delete=False):
self.handle_port_metadata_access_delegate(self.safe_reference, context,
port, is_delete)
def handle_router_metadata_access(self, context,
router_id, interface=None):
self.handle_metadata_access_delegate(self.safe_reference, context,
router_id, interface)