vmware-nsx/neutron/plugins/ofagent/agent/ofa_neutron_agent.py
Ihar Hrachyshka 3aca3f7745 Port to oslo.messaging
Now that all preparations are done, actually port the code to use
oslo.messaging. This patch does as little as possible. Follow up patches
that refactor and cleanup the code and configuration files, will be
merged later. The reason for this is to make the patch as slim as
possible, to make review process more smooth and concentrated.

Details:
* neutron/common/rpc.py:
  - added init() and cleanup() to set global RPC layer state.
  - added utility functions: get_server(), get_client(), get_notifier()
    that wrap up oslo.messaging API a bit, enforcing eventlet executor
    and setting serializer, among other things.
  - removed PluginRpcDispatcher, instead introduced PluginRpcSerializer
    to use as a default serializer for API callbacks.

* neutron/common/rpc_compat.py:
  - emulated incubator RPC layer behaviour thru previously introduced
    stub classes (RpcCallback, RpcProxy, ...) using new oslo.messaging
    API.
  - switched to using new oslo.messaging exception types.

* neutron/service.py:
  - expect multiple RPC listeners that are of MessageHandlingServer
    type, not GreenThread.

* neutron/common/config.py:
  - initialize RPC layer in init()

* setup.cfg:
  - added entry points for old notifier drivers to retain backward
    compatibility.

* neutron/tests/...:
  - introduced fake_notifier to replace impl_fake.
  - faked out consume_in_thread() to avoid starting RPC listeners when
    running unit tests.
  - used 'fake' transport driver.
  - made sure neutron.test.* exceptions are caught.
  - initialize and clean up RPC layer for each test case.

* Ported all affected code from using neutron.openstack.common.notifier
  API to oslo.messaging.Notifier.

* rpc.set_defaults() was renamed to rpc.set_transport_defaults()

* other changes not worth mentioning here.

blueprint oslo-messaging

DocImpact

Change-Id: I5a91c34df6e300f2dc46217b1b16352fcc3039fc
2014-06-19 12:58:01 +02:00

1427 lines
63 KiB
Python

# Copyright (C) 2014 VA Linux Systems Japan K.K.
# Based on openvswitch agent.
#
# Copyright 2011 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
import time
import netaddr
from oslo.config import cfg
from ryu.app.ofctl import api as ryu_api
from ryu.base import app_manager
from ryu.lib import hub
from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import polling
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as n_const
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.plugins.common import constants as p_const
from neutron.plugins.ofagent.common import config # noqa
from neutron.plugins.openvswitch.common import constants
LOG = logging.getLogger(__name__)
# A placeholder for dead vlans.
DEAD_VLAN_TAG = str(n_const.MAX_VLAN_TAG + 1)
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
# attributes set).
class LocalVLANMapping:
def __init__(self, vlan, network_type, physical_network, segmentation_id,
vif_ports=None):
if vif_ports is None:
vif_ports = {}
self.vlan = vlan
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
self.vif_ports = vif_ports
# set of tunnel ports on which packets should be flooded
self.tun_ofports = set()
def __str__(self):
return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
(self.vlan, self.network_type, self.physical_network,
self.segmentation_id))
class OVSBridge(ovs_lib.OVSBridge):
def __init__(self, br_name, root_helper, ryuapp):
super(OVSBridge, self).__init__(br_name, root_helper)
self.datapath_id = None
self.datapath = None
self.ofparser = None
self.ryuapp = ryuapp
def find_datapath_id(self):
self.datapath_id = self.get_datapath_id()
def get_datapath(self, retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
retry = 0
while self.datapath is None:
self.datapath = ryu_api.get_datapath(self.ryuapp,
int(self.datapath_id, 16))
retry += 1
if retry >= retry_max:
LOG.error(_('Agent terminated!: Failed to get a datapath.'))
raise SystemExit(1)
time.sleep(1)
self.ofparser = self.datapath.ofproto_parser
def setup_ofp(self, controller_names=None,
protocols='OpenFlow13',
retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
if not controller_names:
host = cfg.CONF.ofp_listen_host
if not host:
# 127.0.0.1 is a default for agent style of controller
host = '127.0.0.1'
controller_names = ["tcp:%s:%d" % (host,
cfg.CONF.ofp_tcp_listen_port)]
try:
self.set_protocols(protocols)
self.set_controller(controller_names)
except RuntimeError:
LOG.exception(_("Agent terminated"))
raise SystemExit(1)
self.find_datapath_id()
self.get_datapath(retry_max)
class OFAPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
pass
class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, context, plugin_rpc, root_helper):
self.context = context
self.plugin_rpc = plugin_rpc
self.root_helper = root_helper
self.init_firewall(defer_refresh_firewall=True)
class OFANeutronAgentRyuApp(app_manager.RyuApp):
OFP_VERSIONS = [ryu_ofp13.OFP_VERSION]
def start(self):
super(OFANeutronAgentRyuApp, self).start()
return hub.spawn(self._agent_main, self)
def _agent_main(self, ryuapp):
cfg.CONF.register_opts(ip_lib.OPTS)
n_utils.log_opt_values(LOG)
try:
agent_config = create_agent_config_map(cfg.CONF)
except ValueError:
LOG.exception(_("Agent failed to create agent config map"))
raise SystemExit(1)
is_xen_compute_host = ('rootwrap-xen-dom0' in
agent_config['root_helper'])
if is_xen_compute_host:
# Force ip_lib to always use the root helper to ensure that ip
# commands target xen dom0 rather than domU.
cfg.CONF.set_default('ip_lib_force_root', True)
agent = OFANeutronAgent(ryuapp, **agent_config)
# Start everything.
LOG.info(_("Agent initialized successfully, now running... "))
agent.daemon_loop()
class OFANeutronAgent(rpc_compat.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
"""A agent for OpenFlow Agent ML2 mechanism driver.
OFANeutronAgent is a OpenFlow Agent agent for a ML2 plugin.
This is as a ryu application thread.
- An agent acts as an OpenFlow controller on each compute nodes.
- OpenFlow 1.3 (vendor agnostic unlike OVS extensions).
"""
# history
# 1.0 Initial version
# 1.1 Support Security Group RPC
RPC_API_VERSION = '1.1'
def __init__(self, ryuapp, integ_br, tun_br, local_ip,
bridge_mappings, root_helper,
polling_interval, tunnel_types=None,
veth_mtu=None, l2_population=False,
minimize_polling=False,
ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN)):
"""Constructor.
:param ryuapp: object of the ryu app.
:param integ_br: name of the integration bridge.
:param tun_br: name of the tunnel bridge.
:param local_ip: local IP address of this hypervisor.
:param bridge_mappings: mappings from physical network name to bridge.
:param root_helper: utility to use when running shell cmds.
:param polling_interval: interval (secs) to poll DB.
:param tunnel_types: A list of tunnel types to enable support for in
the agent. If set, will automatically set enable_tunneling to
True.
:param veth_mtu: MTU size for veth interfaces.
:param minimize_polling: Optional, whether to minimize polling by
monitoring ovsdb for interface changes.
:param ovsdb_monitor_respawn_interval: Optional, when using polling
minimization, the number of seconds to wait before respawning
the ovsdb monitor.
"""
super(OFANeutronAgent, self).__init__()
self.ryuapp = ryuapp
self.veth_mtu = veth_mtu
self.root_helper = root_helper
self.available_local_vlans = set(xrange(n_const.MIN_VLAN_TAG,
n_const.MAX_VLAN_TAG))
self.tunnel_types = tunnel_types or []
self.l2_pop = l2_population
self.agent_state = {
'binary': 'neutron-ofa-agent',
'host': cfg.CONF.host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': bridge_mappings,
'tunnel_types': self.tunnel_types,
'tunneling_ip': local_ip,
'l2_population': self.l2_pop},
'agent_type': n_const.AGENT_TYPE_OFA,
'start_flag': True}
# Keep track of int_br's device count for use by _report_state()
self.int_br_device_count = 0
self.int_br = OVSBridge(integ_br, self.root_helper, self.ryuapp)
# Stores port update notifications for processing in main loop
self.updated_ports = set()
self.setup_rpc()
self.setup_integration_br()
self.setup_physical_bridges(bridge_mappings)
self.local_vlan_map = {}
self.tun_br_ofports = {p_const.TYPE_GRE: {},
p_const.TYPE_VXLAN: {}}
self.polling_interval = polling_interval
self.minimize_polling = minimize_polling
self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
self.enable_tunneling = bool(self.tunnel_types)
self.local_ip = local_ip
self.tunnel_count = 0
self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
self.dont_fragment = cfg.CONF.AGENT.dont_fragment
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
# Collect additional bridges to monitor
self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
# Security group agent support
self.sg_agent = OFASecurityGroupAgent(self.context,
self.plugin_rpc,
self.root_helper)
# Initialize iteration counter
self.iter_num = 0
def _report_state(self):
# How many devices are likely used by a VM
self.agent_state.get('configurations')['devices'] = (
self.int_br_device_count)
try:
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_("Failed reporting state!"))
def _create_tunnel_port_name(self, tunnel_type, ip_address):
try:
ip_hex = '%08x' % netaddr.IPAddress(ip_address, version=4)
return '%s-%s' % (tunnel_type, ip_hex)
except Exception:
LOG.warn(_("Unable to create tunnel port. Invalid remote IP: %s"),
ip_address)
def ryu_send_msg(self, msg):
result = ryu_api.send_msg(self.ryuapp, msg)
LOG.info(_("ryu send_msg() result: %s"), result)
def setup_rpc(self):
mac = self.int_br.get_local_port_mac()
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = OFAPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
if vif_id in vlan_mapping.vif_ports:
return network_id
def network_delete(self, context, **kwargs):
network_id = kwargs.get('network_id')
LOG.debug(_("network_delete received network %s"), network_id)
# The network may not be defined on this agent
lvm = self.local_vlan_map.get(network_id)
if lvm:
self.reclaim_local_vlan(network_id)
else:
LOG.debug(_("Network %s not used on agent."), network_id)
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
LOG.debug(_("port_update received port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
LOG.debug(_("tunnel_update received"))
if not self.enable_tunneling:
return
tunnel_ip = kwargs.get('tunnel_ip')
tunnel_type = kwargs.get('tunnel_type')
if not tunnel_type:
LOG.error(_("No tunnel_type specified, cannot create tunnels"))
return
if tunnel_type not in self.tunnel_types:
LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type)
return
if tunnel_ip == self.local_ip:
return
tun_name = self._create_tunnel_port_name(tunnel_type, tunnel_ip)
if not tun_name:
return
self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self]
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
segmentation_id, ofports):
br = self.tun_br
match = br.ofparser.OFPMatch(
vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)
actions = [br.ofparser.OFPActionPopVlan(),
br.ofparser.OFPActionSetField(
tunnel_id=int(segmentation_id))]
for ofport in ofports:
actions.append(br.ofparser.OFPActionOutput(ofport, 0))
instructions = [br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
msg = br.ofparser.OFPFlowMod(
br.datapath,
table_id=constants.FLOOD_TO_TUN,
priority=1,
match=match, instructions=instructions)
self.ryu_send_msg(msg)
def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,
segmentation_id):
br = self.tun_br
match = br.ofparser.OFPMatch(
tunnel_id=int(segmentation_id))
actions = [
br.ofparser.OFPActionPushVlan(),
br.ofparser.OFPActionSetField(
vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)]
instructions = [
br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS, actions),
br.ofparser.OFPInstructionGotoTable(
table_id=constants.LEARN_FROM_TUN)]
msg = br.ofparser.OFPFlowMod(
br.datapath,
table_id=constants.TUN_TABLE[network_type],
priority=1,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
def _local_vlan_for_tunnel(self, lvid, network_type, segmentation_id):
ofports = [int(ofport) for ofport in
self.tun_br_ofports[network_type].values()]
if ofports:
self._provision_local_vlan_outbound_for_tunnel(
lvid, segmentation_id, ofports)
self._provision_local_vlan_inbound_for_tunnel(lvid, network_type,
segmentation_id)
def _provision_local_vlan_outbound(self, lvid, vlan_vid, physical_network):
br = self.phys_brs[physical_network]
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
match = ofpp.OFPMatch(in_port=int(self.phys_ofports[physical_network]),
vlan_vid=int(lvid) | ofp.OFPVID_PRESENT)
if vlan_vid == ofp.OFPVID_NONE:
actions = [ofpp.OFPActionPopVlan()]
else:
actions = [ofpp.OFPActionSetField(vlan_vid=vlan_vid)]
actions += [ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0)]
instructions = [
ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions),
]
msg = ofpp.OFPFlowMod(datapath, priority=4, match=match,
instructions=instructions)
self.ryu_send_msg(msg)
def _provision_local_vlan_inbound(self, lvid, vlan_vid, physical_network):
datapath = self.int_br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
match = ofpp.OFPMatch(in_port=int(self.int_ofports[physical_network]),
vlan_vid=vlan_vid)
if vlan_vid == ofp.OFPVID_NONE:
actions = [ofpp.OFPActionPushVlan()]
else:
actions = []
actions += [
ofpp.OFPActionSetField(vlan_vid=int(lvid) | ofp.OFPVID_PRESENT),
ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
]
instructions = [
ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions),
]
msg = ofpp.OFPFlowMod(datapath, priority=3, match=match,
instructions=instructions)
self.ryu_send_msg(msg)
def _local_vlan_for_flat(self, lvid, physical_network):
vlan_vid = ryu_ofp13.OFPVID_NONE
self._provision_local_vlan_outbound(lvid, vlan_vid, physical_network)
self._provision_local_vlan_inbound(lvid, vlan_vid, physical_network)
def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id):
vlan_vid = int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT
self._provision_local_vlan_outbound(lvid, vlan_vid, physical_network)
self._provision_local_vlan_inbound(lvid, vlan_vid, physical_network)
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):
"""Provisions a local VLAN.
:param net_uuid: the uuid of the network associated with this vlan.
:param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat',
'local')
:param physical_network: the physical network for 'vlan' or 'flat'
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
"""
if not self.available_local_vlans:
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
return
lvid = self.available_local_vlans.pop()
LOG.info(_("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
{'vlan_id': lvid, 'net_uuid': net_uuid})
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
physical_network,
segmentation_id)
if network_type in constants.TUNNEL_NETWORK_TYPES:
if self.enable_tunneling:
self._local_vlan_for_tunnel(lvid, network_type,
segmentation_id)
else:
LOG.error(_("Cannot provision %(network_type)s network for "
"net-id=%(net_uuid)s - tunneling disabled"),
{'network_type': network_type,
'net_uuid': net_uuid})
elif network_type == p_const.TYPE_FLAT:
if physical_network in self.phys_brs:
self._local_vlan_for_flat(lvid, physical_network)
else:
LOG.error(_("Cannot provision flat network for "
"net-id=%(net_uuid)s - no bridge for "
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_VLAN:
if physical_network in self.phys_brs:
self._local_vlan_for_vlan(lvid, physical_network,
segmentation_id)
else:
LOG.error(_("Cannot provision VLAN network for "
"net-id=%(net_uuid)s - no bridge for "
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
LOG.error(_("Cannot provision unknown network type "
"%(network_type)s for net-id=%(net_uuid)s"),
{'network_type': network_type,
'net_uuid': net_uuid})
def _reclaim_local_vlan_outbound(self, lvm):
br = self.phys_brs[lvm.physical_network]
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
match = ofpp.OFPMatch(
in_port=int(self.phys_ofports[lvm.physical_network]),
vlan_vid=int(lvm.vlan) | ofp.OFPVID_PRESENT)
msg = ofpp.OFPFlowMod(datapath, table_id=ofp.OFPTT_ALL,
command=ofp.OFPFC_DELETE, out_group=ofp.OFPG_ANY,
out_port=ofp.OFPP_ANY, match=match)
self.ryu_send_msg(msg)
def _reclaim_local_vlan_inbound(self, lvm):
datapath = self.int_br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
if lvm.network_type == p_const.TYPE_FLAT:
vid = ofp.OFPVID_NONE
else: # p_const.TYPE_VLAN
vid = lvm.segmentation_id | ofp.OFPVID_PRESENT
match = ofpp.OFPMatch(
in_port=int(self.int_ofports[lvm.physical_network]),
vlan_vid=vid)
msg = ofpp.OFPFlowMod(datapath, table_id=ofp.OFPTT_ALL,
command=ofp.OFPFC_DELETE, out_group=ofp.OFPG_ANY,
out_port=ofp.OFPP_ANY, match=match)
self.ryu_send_msg(msg)
def reclaim_local_vlan(self, net_uuid):
"""Reclaim a local VLAN.
:param net_uuid: the network uuid associated with this vlan.
:param lvm: a LocalVLANMapping object that tracks (vlan, lsw_id,
vif_ids) mapping.
"""
lvm = self.local_vlan_map.pop(net_uuid, None)
if lvm is None:
LOG.debug(_("Network %s not used on agent."), net_uuid)
return
LOG.info(_("Reclaiming vlan = %(vlan_id)s from net-id = %(net_uuid)s"),
{'vlan_id': lvm.vlan,
'net_uuid': net_uuid})
if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:
if self.enable_tunneling:
match = self.tun_br.ofparser.OFPMatch(
tunnel_id=int(lvm.segmentation_id))
msg = self.tun_br.ofparser.OFPFlowMod(
self.tun_br.datapath,
table_id=constants.TUN_TABLE[lvm.network_type],
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY,
match=match)
self.ryu_send_msg(msg)
match = self.tun_br.ofparser.OFPMatch(
vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)
msg = self.tun_br.ofparser.OFPFlowMod(
self.tun_br.datapath,
table_id=ryu_ofp13.OFPTT_ALL,
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY,
match=match)
self.ryu_send_msg(msg)
elif lvm.network_type in (p_const.TYPE_FLAT, p_const.TYPE_VLAN):
if lvm.physical_network in self.phys_brs:
self._reclaim_local_vlan_outbound(lvm)
self._reclaim_local_vlan_inbound(lvm)
elif lvm.network_type == p_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
LOG.error(_("Cannot reclaim unknown network type "
"%(network_type)s for net-id=%(net_uuid)s"),
{'network_type': lvm.network_type,
'net_uuid': net_uuid})
self.available_local_vlans.add(lvm.vlan)
def port_bound(self, port, net_uuid,
network_type, physical_network, segmentation_id):
"""Bind port to net_uuid/lsw_id and install flow for inbound traffic
to vm.
:param port: a ovs_lib.VifPort object.
:param net_uuid: the net_uuid this port is to be associated with.
:param network_type: the network type ('gre', 'vlan', 'flat', 'local')
:param physical_network: the physical network for 'vlan' or 'flat'
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
"""
if net_uuid not in self.local_vlan_map:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
lvm.vif_ports[port.vif_id] = port
# Do not bind a port if it's already bound
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
if cur_tag != str(lvm.vlan):
self.int_br.set_db_attribute("Port", port.port_name, "tag",
str(lvm.vlan))
if port.ofport != -1:
match = self.int_br.ofparser.OFPMatch(in_port=port.ofport)
msg = self.int_br.ofparser.OFPFlowMod(
self.int_br.datapath,
table_id=ryu_ofp13.OFPTT_ALL,
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY,
match=match)
self.ryu_send_msg(msg)
def port_unbound(self, vif_id, net_uuid=None):
"""Unbind port.
Removes corresponding local vlan mapping object if this is its last
VIF.
:param vif_id: the id of the vif
:param net_uuid: the net_uuid this port is associated with.
"""
net_uuid = net_uuid or self.get_net_uuid(vif_id)
if not self.local_vlan_map.get(net_uuid):
LOG.info(_('port_unbound() net_uuid %s not in local_vlan_map'),
net_uuid)
return
lvm = self.local_vlan_map[net_uuid]
lvm.vif_ports.pop(vif_id, None)
if not lvm.vif_ports:
self.reclaim_local_vlan(net_uuid)
def port_dead(self, port):
"""Once a port has no binding, put it on the "dead vlan".
:param port: a ovs_lib.VifPort object.
"""
# Don't kill a port if it's already dead
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
if cur_tag != DEAD_VLAN_TAG:
self.int_br.set_db_attribute("Port", port.port_name, "tag",
DEAD_VLAN_TAG)
match = self.int_br.ofparser.OFPMatch(in_port=port.ofport)
msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
priority=2, match=match)
self.ryu_send_msg(msg)
def setup_integration_br(self):
"""Setup the integration bridge.
Create patch ports and remove all existing flows.
:param bridge_name: the name of the integration bridge.
:returns: the integration bridge
"""
self.int_br.setup_ofp()
self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port)
msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
table_id=ryu_ofp13.OFPTT_ALL,
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY)
self.ryu_send_msg(msg)
# switch all traffic using L2 learning
actions = [self.int_br.ofparser.OFPActionOutput(
ryu_ofp13.OFPP_NORMAL, 0)]
instructions = [self.int_br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS,
actions)]
msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
priority=1,
instructions=instructions)
self.ryu_send_msg(msg)
def setup_ancillary_bridges(self, integ_br, tun_br):
"""Setup ancillary bridges - for example br-ex."""
ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))
# Remove all known bridges
ovs_bridges.remove(integ_br)
if self.enable_tunneling:
ovs_bridges.remove(tun_br)
br_names = [self.phys_brs[physical_network].br_name for
physical_network in self.phys_brs]
ovs_bridges.difference_update(br_names)
# Filter list of bridges to those that have external
# bridge-id's configured
br_names = [
bridge for bridge in ovs_bridges
if bridge != ovs_lib.get_bridge_external_bridge_id(
self.root_helper, bridge)
]
ovs_bridges.difference_update(br_names)
ancillary_bridges = []
for bridge in ovs_bridges:
br = OVSBridge(bridge, self.root_helper, self.ryuapp)
ancillary_bridges.append(br)
LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges)
return ancillary_bridges
def _tun_br_sort_incoming_traffic_depend_in_port(self, br):
match = br.ofparser.OFPMatch(
in_port=int(self.patch_int_ofport))
instructions = [br.ofparser.OFPInstructionGotoTable(
table_id=constants.PATCH_LV_TO_TUN)]
msg = br.ofparser.OFPFlowMod(br.datapath,
priority=1,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
msg = br.ofparser.OFPFlowMod(br.datapath, priority=0)
self.ryu_send_msg(msg)
def _tun_br_goto_table_ucast_unicast(self, br):
match = br.ofparser.OFPMatch(eth_dst=('00:00:00:00:00:00',
'01:00:00:00:00:00'))
instructions = [br.ofparser.OFPInstructionGotoTable(
table_id=constants.UCAST_TO_TUN)]
msg = br.ofparser.OFPFlowMod(br.datapath,
table_id=constants.PATCH_LV_TO_TUN,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
def _tun_br_goto_table_flood_broad_multi_cast(self, br):
match = br.ofparser.OFPMatch(eth_dst=('01:00:00:00:00:00',
'01:00:00:00:00:00'))
instructions = [br.ofparser.OFPInstructionGotoTable(
table_id=constants.FLOOD_TO_TUN)]
msg = br.ofparser.OFPFlowMod(br.datapath,
table_id=constants.PATCH_LV_TO_TUN,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
def _tun_br_set_table_tun_by_tunnel_type(self, br):
for tunnel_type in constants.TUNNEL_NETWORK_TYPES:
msg = br.ofparser.OFPFlowMod(
br.datapath,
table_id=constants.TUN_TABLE[tunnel_type],
priority=0)
self.ryu_send_msg(msg)
def _tun_br_output_patch_int(self, br):
actions = [br.ofparser.OFPActionOutput(
int(self.patch_int_ofport), 0)]
instructions = [br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS,
actions)]
msg = br.ofparser.OFPFlowMod(br.datapath,
table_id=constants.LEARN_FROM_TUN,
priority=1,
instructions=instructions)
self.ryu_send_msg(msg)
def _tun_br_goto_table_flood_unknown_unicast(self, br):
instructions = [br.ofparser.OFPInstructionGotoTable(
table_id=constants.FLOOD_TO_TUN)]
msg = br.ofparser.OFPFlowMod(br.datapath,
table_id=constants.UCAST_TO_TUN,
priority=0,
instructions=instructions)
self.ryu_send_msg(msg)
def _tun_br_default_drop(self, br):
msg = br.ofparser.OFPFlowMod(
br.datapath,
table_id=constants.FLOOD_TO_TUN,
priority=0)
self.ryu_send_msg(msg)
def setup_tunnel_br(self, tun_br):
"""Setup the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
using a patch port.
:param tun_br: the name of the tunnel bridge.
"""
self.tun_br = OVSBridge(tun_br, self.root_helper, self.ryuapp)
self.tun_br.reset_bridge()
self.tun_br.setup_ofp()
self.patch_tun_ofport = self.int_br.add_patch_port(
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
self.patch_int_ofport = self.tun_br.add_patch_port(
cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
LOG.error(_("Failed to create OVS patch port. Cannot have "
"tunneling enabled on this agent, since this version "
"of OVS does not support tunnels or patch ports. "
"Agent terminated!"))
raise SystemExit(1)
msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
table_id=ryu_ofp13.OFPTT_ALL,
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY)
self.ryu_send_msg(msg)
self._tun_br_sort_incoming_traffic_depend_in_port(self.tun_br)
self._tun_br_goto_table_ucast_unicast(self.tun_br)
self._tun_br_goto_table_flood_broad_multi_cast(self.tun_br)
self._tun_br_set_table_tun_by_tunnel_type(self.tun_br)
self._tun_br_output_patch_int(self.tun_br)
self._tun_br_goto_table_flood_unknown_unicast(self.tun_br)
self._tun_br_default_drop(self.tun_br)
def _phys_br_prepare_create_veth(self, br, int_veth_name, phys_veth_name):
self.int_br.delete_port(int_veth_name)
br.delete_port(phys_veth_name)
if ip_lib.device_exists(int_veth_name, self.root_helper):
ip_lib.IPDevice(int_veth_name, self.root_helper).link.delete()
# Give udev a chance to process its rules here, to avoid
# race conditions between commands launched by udev rules
# and the subsequent call to ip_wrapper.add_veth
utils.execute(['/sbin/udevadm', 'settle', '--timeout=10'])
def _phys_br_create_veth(self, br, int_veth_name,
phys_veth_name, physical_network, ip_wrapper):
int_veth, phys_veth = ip_wrapper.add_veth(int_veth_name,
phys_veth_name)
self.int_ofports[physical_network] = self.int_br.add_port(int_veth)
self.phys_ofports[physical_network] = br.add_port(phys_veth)
return (int_veth, phys_veth)
def _phys_br_block_untranslated_traffic(self, br, physical_network):
match = self.int_br.ofparser.OFPMatch(in_port=int(
self.int_ofports[physical_network]))
msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
priority=2, match=match)
self.ryu_send_msg(msg)
match = br.ofparser.OFPMatch(in_port=int(
self.phys_ofports[physical_network]))
msg = br.ofparser.OFPFlowMod(br.datapath, priority=2, match=match)
self.ryu_send_msg(msg)
def _phys_br_enable_veth_to_pass_traffic(self, int_veth, phys_veth):
# enable veth to pass traffic
int_veth.link.set_up()
phys_veth.link.set_up()
if self.veth_mtu:
# set up mtu size for veth interfaces
int_veth.link.set_mtu(self.veth_mtu)
phys_veth.link.set_mtu(self.veth_mtu)
def _phys_br_patch_physical_bridge_with_integration_bridge(
self, br, physical_network, bridge, ip_wrapper):
int_veth_name = constants.VETH_INTEGRATION_PREFIX + bridge
phys_veth_name = constants.VETH_PHYSICAL_PREFIX + bridge
self._phys_br_prepare_create_veth(br, int_veth_name, phys_veth_name)
int_veth, phys_veth = self._phys_br_create_veth(br, int_veth_name,
phys_veth_name,
physical_network,
ip_wrapper)
self._phys_br_block_untranslated_traffic(br, physical_network)
self._phys_br_enable_veth_to_pass_traffic(int_veth, phys_veth)
def setup_physical_bridges(self, bridge_mappings):
"""Setup the physical network bridges.
Creates physical network bridges and links them to the
integration bridge using veths.
:param bridge_mappings: map physical network names to bridge names.
"""
self.phys_brs = {}
self.int_ofports = {}
self.phys_ofports = {}
ip_wrapper = ip_lib.IPWrapper(self.root_helper)
for physical_network, bridge in bridge_mappings.iteritems():
LOG.info(_("Mapping physical network %(physical_network)s to "
"bridge %(bridge)s"),
{'physical_network': physical_network,
'bridge': bridge})
# setup physical bridge
if not ip_lib.device_exists(bridge, self.root_helper):
LOG.error(_("Bridge %(bridge)s for physical network "
"%(physical_network)s does not exist. Agent "
"terminated!"),
{'physical_network': physical_network,
'bridge': bridge})
raise SystemExit(1)
br = OVSBridge(bridge, self.root_helper, self.ryuapp)
br.setup_ofp()
msg = br.ofparser.OFPFlowMod(br.datapath,
table_id=ryu_ofp13.OFPTT_ALL,
command=ryu_ofp13.OFPFC_DELETE,
out_group=ryu_ofp13.OFPG_ANY,
out_port=ryu_ofp13.OFPP_ANY)
self.ryu_send_msg(msg)
actions = [br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
instructions = [br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS,
actions)]
msg = br.ofparser.OFPFlowMod(br.datapath,
priority=1,
instructions=instructions)
self.ryu_send_msg(msg)
self.phys_brs[physical_network] = br
self._phys_br_patch_physical_bridge_with_integration_bridge(
br, physical_network, bridge, ip_wrapper)
def scan_ports(self, registered_ports, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
port_info = {'current': cur_ports}
if updated_ports is None:
updated_ports = set()
updated_ports.update(self._find_lost_vlan_port(registered_ports))
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= cur_ports
if updated_ports:
port_info['updated'] = updated_ports
if cur_ports == registered_ports:
# No added or removed ports to set, just return here
return port_info
port_info['added'] = cur_ports - registered_ports
# Remove all the known ports not found on the integration bridge
port_info['removed'] = registered_ports - cur_ports
return port_info
def _find_lost_vlan_port(self, registered_ports):
"""Return ports which have lost their vlan tag.
The returned value is a set of port ids of the ports concerned by a
vlan tag loss.
"""
port_tags = self.int_br.get_port_tag_dict()
changed_ports = set()
for lvm in self.local_vlan_map.values():
for port in registered_ports:
if (
port in lvm.vif_ports
and lvm.vif_ports[port].port_name in port_tags
and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan
):
LOG.info(
_("Port '%(port_name)s' has lost "
"its vlan tag '%(vlan_tag)d'!"),
{'port_name': lvm.vif_ports[port].port_name,
'vlan_tag': lvm.vlan}
)
changed_ports.add(port)
return changed_ports
def update_ancillary_ports(self, registered_ports):
ports = set()
for bridge in self.ancillary_brs:
ports |= bridge.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up):
if vif_port:
# When this function is called for a port, the port should have
# an OVS ofport configured, as only these ports were considered
# for being treated. If that does not happen, it is a potential
# error condition of which operators should be aware
if not vif_port.ofport:
LOG.warn(_("VIF port: %s has no ofport configured, and might "
"not be able to transmit"), vif_port.vif_id)
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
physical_network, segmentation_id)
else:
self.port_dead(vif_port)
else:
LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
def setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
ofport = self.tun_br.add_tunnel_port(port_name,
remote_ip,
self.local_ip,
tunnel_type,
self.vxlan_udp_port,
self.dont_fragment)
ofport_int = -1
try:
ofport_int = int(ofport)
except (TypeError, ValueError):
LOG.exception(_("ofport should have a value that can be "
"interpreted as an integer"))
if ofport_int < 0:
LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': tunnel_type, 'ip': remote_ip})
return 0
self.tun_br_ofports[tunnel_type][remote_ip] = ofport
# Add flow in default table to resubmit to the right
# tunelling table (lvid will be set in the latter)
match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))
instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(
table_id=constants.TUN_TABLE[tunnel_type])]
msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
priority=1,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
ofports = [int(p) for p in self.tun_br_ofports[tunnel_type].values()]
if ofports:
# Update flooding flows to include the new tunnel
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
if vlan_mapping.network_type == tunnel_type:
match = self.tun_br.ofparser.OFPMatch(
vlan_vid=int(vlan_mapping.vlan) |
ryu_ofp13.OFPVID_PRESENT)
actions = [
self.tun_br.ofparser.OFPActionPopVlan(),
self.tun_br.ofparser.OFPActionSetField(
tunnel_id=int(vlan_mapping.segmentation_id))]
actions.extend(
self.tun_br.ofparser.OFPActionOutput(p, 0)
for p in ofports
)
instructions = [
self.tun_br.ofparser.OFPInstructionActions(
ryu_ofp13.OFPIT_APPLY_ACTIONS,
actions)]
msg = self.tun_br.ofparser.OFPFlowMod(
self.tun_br.datapath,
table_id=constants.FLOOD_TO_TUN,
priority=1,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
return ofport
def treat_devices_added_or_updated(self, devices):
resync = False
for device in devices:
LOG.debug(_("Processing port %s"), device)
port = self.int_br.get_vif_port_by_id(device)
if not port:
# The port has disappeared and should not be processed
# There is no need to put the port DOWN in the plugin as
# it never went up in the first place
LOG.info(_("Port %s was not found on the integration bridge "
"and will therefore not be processed"), device)
continue
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug(_("Unable to get port details for "
"%(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
continue
if 'port_id' in details:
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': details})
self.treat_vif_port(port, details['port_id'],
details['network_id'],
details['network_type'],
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'])
# update plugin about port status
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id, cfg.CONF.host)
else:
LOG.debug(_("Setting status for %s to DOWN"), device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id, cfg.CONF.host)
LOG.info(_("Configuration for device %s completed."), device)
else:
LOG.warn(_("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):
self.port_dead(port)
return resync
def treat_ancillary_devices_added(self, devices):
resync = False
for device in devices:
LOG.info(_("Ancillary Port %s added"), device)
try:
self.plugin_rpc.get_device_details(self.context, device,
self.agent_id)
except Exception as e:
LOG.debug(_("Unable to get port details for "
"%(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
continue
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
return resync
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
for device in devices:
LOG.info(_("Attachment %s removed"), device)
try:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
continue
self.port_unbound(device)
return resync
def treat_ancillary_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info(_("Attachment %s removed"), device)
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
continue
if details['exists']:
LOG.info(_("Port %s updated."), device)
# Nothing to do regarding local networking
else:
LOG.debug(_("Device %s not defined on plugin"), device)
return resync
def process_network_ports(self, port_info):
resync_add = False
resync_removed = False
# If there is an exception while processing security groups ports
# will not be wired anyway, and a resync will be triggered
self.sg_agent.setup_port_filters(port_info.get('added', set()),
port_info.get('updated', set()))
# VIF wiring needs to be performed always for 'new' devices.
# For updated ports, re-wiring is not needed in most cases, but needs
# to be performed anyway when the admin state of a device is changed.
# A device might be both in the 'added' and 'updated'
# list at the same time; avoid processing it twice.
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
if devices_added_updated:
start = time.time()
resync_add = self.treat_devices_added_or_updated(
devices_added_updated)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed "
"in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if 'removed' in port_info:
start = time.time()
resync_removed = self.treat_devices_removed(port_info['removed'])
LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_removed completed in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# If one of the above opertaions fails => resync with plugin
return (resync_add | resync_removed)
def process_ancillary_network_ports(self, port_info):
resync_add = False
resync_removed = False
if 'added' in port_info:
start = time.time()
resync_add = self.treat_ancillary_devices_added(port_info['added'])
LOG.debug(_("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_added "
"completed in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if 'removed' in port_info:
start = time.time()
resync_removed = self.treat_ancillary_devices_removed(
port_info['removed'])
LOG.debug(_("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_removed "
"completed in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# If one of the above opertaions fails => resync with plugin
return (resync_add | resync_removed)
def tunnel_sync(self):
resync = False
try:
for tunnel_type in self.tunnel_types:
details = self.plugin_rpc.tunnel_sync(self.context,
self.local_ip,
tunnel_type)
tunnels = details['tunnels']
for tunnel in tunnels:
if self.local_ip != tunnel['ip_address']:
tun_name = self._create_tunnel_port_name(
tunnel_type, tunnel['ip_address'])
if not tun_name:
continue
self.setup_tunnel_port(tun_name,
tunnel['ip_address'],
tunnel_type)
except Exception as e:
LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
{'local_ip': self.local_ip, 'e': e})
resync = True
return resync
def _agent_has_updates(self, polling_manager):
return (polling_manager.is_polling_required or
self.updated_ports or
self.sg_agent.firewall_refresh_needed())
def _port_info_has_changes(self, port_info):
return (port_info.get('added') or
port_info.get('removed') or
port_info.get('updated'))
def ovsdb_monitor_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
sync = True
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
while True:
start = time.time()
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0},
'ancillary': {'added': 0, 'removed': 0}}
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%d started"),
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
try:
tunnel_sync = self.tunnel_sync()
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
if self._agent_has_updates(polling_manager):
try:
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Save updated ports dict to perform rollback in
# case resync would be needed, and then clear
# self.updated_ports. As the greenthread should not yield
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
ports = port_info['current']
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed()):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
port_stats['regular']['updated'] = (
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
# Treat ancillary devices if they exist
if self.ancillary_brs:
port_info = self.update_ancillary_ports(
ancillary_ports)
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if port_info:
rc = self.process_ancillary_network_ports(
port_info)
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:"
"%(iter_num)d - ancillary ports "
"processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ancillary_ports = port_info['current']
port_stats['ancillary']['added'] = (
len(port_info.get('added', [])))
port_stats['ancillary']['removed'] = (
len(port_info.get('removed', [])))
sync = sync | rc
polling_manager.polling_completed()
except Exception:
LOG.exception(_("Error while processing VIF ports"))
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
"completed. Processed ports statistics:"
"%(port_stats)s. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'port_stats': port_stats,
'elapsed': elapsed})
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug(_("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!"),
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
self.iter_num = self.iter_num + 1
def daemon_loop(self):
with polling.get_polling_manager(
self.minimize_polling,
self.root_helper,
self.ovsdb_monitor_respawn_interval) as pm:
self.ovsdb_monitor_loop(polling_manager=pm)
def create_agent_config_map(config):
"""Create a map of agent config parameters.
:param config: an instance of cfg.CONF
:returns: a map of agent configuration parameters
"""
try:
bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)
except ValueError as e:
raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
kwargs = dict(
integ_br=config.OVS.integration_bridge,
tun_br=config.OVS.tunnel_bridge,
local_ip=config.OVS.local_ip,
bridge_mappings=bridge_mappings,
root_helper=config.AGENT.root_helper,
polling_interval=config.AGENT.polling_interval,
minimize_polling=config.AGENT.minimize_polling,
tunnel_types=config.AGENT.tunnel_types,
veth_mtu=config.AGENT.veth_mtu,
l2_population=False,
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
)
# If enable_tunneling is TRUE, set tunnel_type to default to GRE
if config.OVS.enable_tunneling and not kwargs['tunnel_types']:
kwargs['tunnel_types'] = [p_const.TYPE_GRE]
# Verify the tunnel_types specified are valid
for tun in kwargs['tunnel_types']:
if tun not in constants.TUNNEL_NETWORK_TYPES:
msg = _('Invalid tunnel type specificed: %s'), tun
raise ValueError(msg)
if not kwargs['local_ip']:
msg = _('Tunneling cannot be enabled without a valid local_ip.')
raise ValueError(msg)
return kwargs