# Copyright (c) 2013 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc from neutron.common import constants as q_const from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.common import utils from neutron.extensions import portbindings from neutron import manager from neutron.openstack.common import log from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel # REVISIT(kmestery): Allow the type and mechanism drivers to supply the # mixins and eventually remove the direct dependencies on type_tunnel. LOG = log.getLogger(__name__) class RpcCallbacks(n_rpc.RpcCallback, type_tunnel.TunnelRpcCallbackMixin): RPC_API_VERSION = '1.3' # history # 1.0 Initial version (from openvswitch/linuxbridge) # 1.1 Support Security Group RPC # 1.2 Support get_devices_details_list # 1.3 get_device_details rpc signature upgrade to obtain 'host' and # return value to include fixed_ips and device_owner for # the device port def __init__(self, notifier, type_manager): self.setup_tunnel_callback_mixin(notifier, type_manager) super(RpcCallbacks, self).__init__() def get_device_details(self, rpc_context, **kwargs): """Agent requests device details.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') host = kwargs.get('host') LOG.debug("Device %(device)s details requested by agent " "%(agent_id)s with host %(host)s", {'device': device, 'agent_id': agent_id, 'host': host}) plugin = manager.NeutronManager.get_plugin() port_id = plugin._device_to_port_id(device) port_context = plugin.get_bound_port_context(rpc_context, port_id, host) if not port_context: LOG.warning(_("Device %(device)s requested by agent " "%(agent_id)s not found in database"), {'device': device, 'agent_id': agent_id}) return {'device': device} segment = port_context.bound_segment port = port_context.current if not segment: LOG.warning(_("Device %(device)s requested by agent " "%(agent_id)s on network %(network_id)s not " "bound, vif_type: %(vif_type)s"), {'device': device, 'agent_id': agent_id, 'network_id': port['network_id'], 'vif_type': port[portbindings.VIF_TYPE]}) return {'device': device} new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up'] else q_const.PORT_STATUS_DOWN) if port['status'] != new_status: plugin.update_port_status(rpc_context, port_id, new_status, host) entry = {'device': device, 'network_id': port['network_id'], 'port_id': port_id, 'mac_address': port['mac_address'], 'admin_state_up': port['admin_state_up'], 'network_type': segment[api.NETWORK_TYPE], 'segmentation_id': segment[api.SEGMENTATION_ID], 'physical_network': segment[api.PHYSICAL_NETWORK], 'fixed_ips': port['fixed_ips'], 'device_owner': port['device_owner'], 'profile': port[portbindings.PROFILE]} LOG.debug(_("Returning: %s"), entry) return entry def get_devices_details_list(self, rpc_context, **kwargs): return [ self.get_device_details( rpc_context, device=device, **kwargs ) for device in kwargs.pop('devices', []) ] def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" # TODO(garyk) - live migration and port status agent_id = kwargs.get('agent_id') device = kwargs.get('device') host = kwargs.get('host') LOG.debug(_("Device %(device)s no longer exists at agent " "%(agent_id)s"), {'device': device, 'agent_id': agent_id}) plugin = manager.NeutronManager.get_plugin() port_id = plugin._device_to_port_id(device) port_exists = True if (host and not plugin.port_bound_to_host(rpc_context, port_id, host)): LOG.debug(_("Device %(device)s not bound to the" " agent host %(host)s"), {'device': device, 'host': host}) return {'device': device, 'exists': port_exists} port_exists = bool(plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_DOWN, host)) return {'device': device, 'exists': port_exists} def update_device_up(self, rpc_context, **kwargs): """Device is up on agent.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') host = kwargs.get('host') LOG.debug(_("Device %(device)s up at agent %(agent_id)s"), {'device': device, 'agent_id': agent_id}) plugin = manager.NeutronManager.get_plugin() port_id = plugin._device_to_port_id(device) if (host and not plugin.port_bound_to_host(rpc_context, port_id, host)): LOG.debug(_("Device %(device)s not bound to the" " agent host %(host)s"), {'device': device, 'host': host}) return port_id = plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_ACTIVE, host) l3plugin = manager.NeutronManager.get_service_plugins().get( service_constants.L3_ROUTER_NAT) if (l3plugin and utils.is_extension_supported(l3plugin, q_const.L3_DISTRIBUTED_EXT_ALIAS)): try: l3plugin.dvr_vmarp_table_update(rpc_context, port_id, "add") except exceptions.PortNotFound: LOG.debug('Port %s not found during ARP update', port_id) class AgentNotifierApi(n_rpc.RpcProxy, dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. API version history: 1.0 - Initial version. 1.1 - Added get_active_networks_info, create_dhcp_port, update_dhcp_port, and removed get_dhcp_port methods. """ BASE_RPC_API_VERSION = '1.1' def __init__(self, topic): super(AgentNotifierApi, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) self.topic_network_delete = topics.get_topic_name(topic, topics.NETWORK, topics.DELETE) self.topic_port_update = topics.get_topic_name(topic, topics.PORT, topics.UPDATE) def network_delete(self, context, network_id): self.fanout_cast(context, self.make_msg('network_delete', network_id=network_id), topic=self.topic_network_delete) def port_update(self, context, port, network_type, segmentation_id, physical_network): self.fanout_cast(context, self.make_msg('port_update', port=port, network_type=network_type, segmentation_id=segmentation_id, physical_network=physical_network), topic=self.topic_port_update)