OVS flows apply concurrently using a deferred OVSBridge

This change is an improvement of the commit
5012136868 and provides a cleaner
implementation. Previously flows were applied on
OVSBridge.defer_apply_off which could be called by an other
greenthread: it was impossible to ensure that all flows are applied
in a unique OVSBridge.defer_apply_off call. This change ensures that
all flows defined using a DeferredOVSBridge are applied on
DeferredOVSBridge.apply_flows or DeferredOVSBridge.__exit__ if not
exception is raised.

Author:         Cedric Brandily <zzelle@gmail.com>
Co-Authored-By: Edouard Thuleau <edouard.thuleau@cloudwatt.com>

Related-bug: #1263866
Change-Id: I1f260629ef95b98ee80e2ff946c3606da8fe7608
This commit is contained in:
cedric.brandily 2014-05-16 16:18:45 -04:00 committed by Cedric Brandily
parent 6b83612dab
commit cf2c2ff4ab
9 changed files with 542 additions and 421 deletions

View File

@ -74,7 +74,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
'''
@abc.abstractmethod
def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
'''Add flow for fdb
This method assumes to be used by method fdb_add_tun.
@ -82,7 +82,10 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
on bridge.
And you may edit some information for local arp respond.
:param br: represent the bridge on which add_fdb_flow should be
applied.
:param port_info: list to include mac and ip.
[mac, ip]
:remote_ip: remote ip address.
:param lvm: a local VLAN map of network.
@ -91,7 +94,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
pass
@abc.abstractmethod
def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
'''Delete flow for fdb
This method assumes to be used by method fdb_remove_tun.
@ -99,6 +102,8 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
from bridge.
And you may delete some information for local arp respond.
:param br: represent the bridge on which del_fdb_flow should be
applied.
:param port_info: a list to contain mac and ip.
[mac, ip]
:remote_ip: remote ip address.
@ -108,7 +113,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
pass
@abc.abstractmethod
def setup_tunnel_port(self, remote_ip, network_type):
def setup_tunnel_port(self, br, remote_ip, network_type):
'''Setup an added tunnel port.
This method assumes to be used by method fdb_add_tun.
@ -116,6 +121,8 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
a port to a bridge.
If you need, you may do some preparation for a bridge.
:param br: represent the bridge on which setup_tunnel_port should be
applied.
:param remote_ip: an ip for port to setup.
:param network_type: a type of network.
:returns: a ofport value. the value 0 means to be unavailable port.
@ -123,7 +130,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
pass
@abc.abstractmethod
def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
'''Clean up a deleted tunnel port.
This method assumes to be used by method fdb_remove_tun.
@ -131,19 +138,23 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
deleting a port from a bridge.
If you need, you may do some cleanup for a bridge.
:param br: represent the bridge on which cleanup_tunnel_port should be
applied.
:param tun_ofport: a port value to cleanup.
:param tunnel_type: a type of tunnel.
'''
pass
@abc.abstractmethod
def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
'''Operate the ARP respond information.
Do operation of arp respond information for an action
In ovs do adding or removing flow entry to edit an arp reply.
:param br: represent the bridge on which setup_entry_for_arp_reply
should be applied.
:param action: an action to operate for arp respond infomation.
"add" or "remove"
:param local_vid: id in local VLAN map of network's ARP entry.
@ -159,28 +170,29 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
yield (lvm, agent_ports)
@log.log
def fdb_add_tun(self, context, lvm, agent_ports, ofports):
def fdb_add_tun(self, context, br, lvm, agent_ports, ofports):
for remote_ip, ports in agent_ports.items():
# Ensure we have a tunnel port with this remote agent
ofport = ofports[lvm.network_type].get(remote_ip)
if not ofport:
ofport = self.setup_tunnel_port(remote_ip, lvm.network_type)
ofport = self.setup_tunnel_port(br, remote_ip,
lvm.network_type)
if ofport == 0:
continue
for port in ports:
self.add_fdb_flow(port, remote_ip, lvm, ofport)
self.add_fdb_flow(br, port, remote_ip, lvm, ofport)
@log.log
def fdb_remove_tun(self, context, lvm, agent_ports, ofports):
def fdb_remove_tun(self, context, br, lvm, agent_ports, ofports):
for remote_ip, ports in agent_ports.items():
ofport = ofports[lvm.network_type].get(remote_ip)
if not ofport:
continue
for port in ports:
self.del_fdb_flow(port, remote_ip, lvm, ofport)
self.del_fdb_flow(br, port, remote_ip, lvm, ofport)
if port == n_const.FLOODING_ENTRY:
# Check if this tunnel port is still used
self.cleanup_tunnel_port(ofport, lvm.network_type)
self.cleanup_tunnel_port(br, ofport, lvm.network_type)
@log.log
def fdb_update(self, context, fdb_entries):
@ -198,13 +210,16 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
getattr(self, method)(context, values)
@log.log
def fdb_chg_ip_tun(self, context, fdb_entries, local_ip, local_vlan_map):
def fdb_chg_ip_tun(self, context, br, fdb_entries, local_ip,
local_vlan_map):
'''fdb update when an IP of a port is updated.
The ML2 l2-pop mechanism driver sends an fdb update rpc message when an
IP of a port is updated.
:param context: RPC context.
:param br: represent the bridge on which fdb_chg_ip_tun should be
applied.
:param fdb_entries: fdb dicts that contain all mac/IP informations per
agent and network.
{'net1':
@ -231,8 +246,10 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
after = state.get('after')
for mac, ip in after:
self.setup_entry_for_arp_reply('add', lvm.vlan, mac, ip)
self.setup_entry_for_arp_reply(br, 'add', lvm.vlan, mac,
ip)
before = state.get('before')
for mac, ip in before:
self.setup_entry_for_arp_reply('remove', lvm.vlan, mac, ip)
self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan, mac,
ip)

View File

@ -13,6 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import operator
from oslo.config import cfg
from neutron.agent.linux import ip_lib
@ -103,8 +106,6 @@ class OVSBridge(BaseOVS):
def __init__(self, br_name, root_helper):
super(OVSBridge, self).__init__(root_helper)
self.br_name = br_name
self.defer_apply_flows = False
self.deferred_flows = {'add': '', 'mod': '', 'del': ''}
def set_controller(self, controller_names):
vsctl_command = ['--', 'set-controller', self.br_name]
@ -188,26 +189,18 @@ class OVSBridge(BaseOVS):
return self.db_get_val('Bridge',
self.br_name, 'datapath_id').strip('"')
def do_action_flows(self, action, kwargs_list):
flow_strs = [_build_flow_expr_str(kw, action) for kw in kwargs_list]
self.run_ofctl('%s-flows' % action, ['-'], '\n'.join(flow_strs))
def add_flow(self, **kwargs):
flow_str = _build_flow_expr_str(kwargs, 'add')
if self.defer_apply_flows:
self.deferred_flows['add'] += flow_str + '\n'
else:
self.run_ofctl("add-flow", [flow_str])
self.do_action_flows('add', [kwargs])
def mod_flow(self, **kwargs):
flow_str = _build_flow_expr_str(kwargs, 'mod')
if self.defer_apply_flows:
self.deferred_flows['mod'] += flow_str + '\n'
else:
self.run_ofctl("mod-flows", [flow_str])
self.do_action_flows('mod', [kwargs])
def delete_flows(self, **kwargs):
flow_expr_str = _build_flow_expr_str(kwargs, 'del')
if self.defer_apply_flows:
self.deferred_flows['del'] += flow_expr_str + '\n'
else:
self.run_ofctl("del-flows", [flow_expr_str])
self.do_action_flows('del', [kwargs])
def dump_flows_for_table(self, table):
retval = None
@ -218,39 +211,8 @@ class OVSBridge(BaseOVS):
if 'NXST' not in item)
return retval
def defer_apply_on(self):
# TODO(vivek): when defer_apply_on is used, DVR
# flows are only getting partially configured when
# run concurrently with l2-pop ON.
# Will need make ovs_lib flow API context sensitive
# and then use the same across this file, which will
# address the race issue here.
LOG.debug(_('defer_apply_on'))
self.defer_apply_flows = True
def defer_apply_off(self):
# TODO(vivek): when defer_apply_off is used, DVR
# flows are only getting partially configured when
# run concurrently with l2-pop ON.
# Will need make ovs_lib flow API context sensitive
# and then use the same across this file, which will
# address the race issue here.
LOG.debug(_('defer_apply_off'))
# Note(ethuleau): stash flows and disable deferred mode. Then apply
# flows from the stashed reference to be sure to not purge flows that
# were added between two ofctl commands.
stashed_deferred_flows, self.deferred_flows = (
self.deferred_flows, {'add': '', 'mod': '', 'del': ''}
)
self.defer_apply_flows = False
for action, flows in stashed_deferred_flows.items():
if flows:
LOG.debug(_('Applying following deferred flows '
'to bridge %s'), self.br_name)
for line in flows.splitlines():
LOG.debug(_('%(action)s: %(flow)s'),
{'action': action, 'flow': line})
self.run_ofctl('%s-flows' % action, ['-'], flows)
def deferred(self, **kwargs):
return DeferredOVSBridge(self, **kwargs)
def add_tunnel_port(self, port_name, remote_ip, local_ip,
tunnel_type=p_const.TYPE_GRE,
@ -488,6 +450,77 @@ class OVSBridge(BaseOVS):
self.destroy()
class DeferredOVSBridge(object):
'''Deferred OVSBridge.
This class wraps add_flow, mod_flow and delete_flows calls to an OVSBridge
and defers their application until apply_flows call in order to perform
bulk calls. It wraps also ALLOWED_PASSTHROUGHS calls to avoid mixing
OVSBridge and DeferredOVSBridge uses.
This class can be used as a context, in such case apply_flows is called on
__exit__ except if an exception is raised.
This class is not thread-safe, that's why for every use a new instance
must be implemented.
'''
ALLOWED_PASSTHROUGHS = 'add_port', 'delete_port'
def __init__(self, br, full_ordered=False,
order=('add', 'mod', 'del')):
'''Constructor.
:param br: wrapped bridge
:param full_ordered: Optional, disable flow reordering (slower)
:param order: Optional, define in which order flow are applied
'''
self.br = br
self.full_ordered = full_ordered
self.order = order
if not self.full_ordered:
self.weights = dict((y, x) for x, y in enumerate(self.order))
self.action_flow_tuples = []
def __getattr__(self, name):
if name in self.ALLOWED_PASSTHROUGHS:
return getattr(self.br, name)
raise AttributeError(name)
def add_flow(self, **kwargs):
self.action_flow_tuples.append(('add', kwargs))
def mod_flow(self, **kwargs):
self.action_flow_tuples.append(('mod', kwargs))
def delete_flows(self, **kwargs):
self.action_flow_tuples.append(('del', kwargs))
def apply_flows(self):
action_flow_tuples = self.action_flow_tuples
self.action_flow_tuples = []
if not action_flow_tuples:
return
if not self.full_ordered:
action_flow_tuples.sort(key=lambda af: self.weights[af[0]])
grouped = itertools.groupby(action_flow_tuples,
key=operator.itemgetter(0))
itemgetter_1 = operator.itemgetter(1)
for action, action_flow_list in grouped:
flows = map(itemgetter_1, action_flow_list)
self.br.do_action_flows(action, flows)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.apply_flows()
else:
LOG.exception(_("OVS flows could not be applied on bridge %s"),
self.br.br_name)
def get_bridge_for_iface(root_helper, iface):
args = ["ovs-vsctl", "--timeout=%d" % cfg.CONF.ovs_vsctl_timeout,
"iface-to-br", iface]

View File

@ -367,7 +367,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
self.local_vlan_map):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
self.fdb_add_tun(context, lvm, agent_ports,
self.fdb_add_tun(context, self.tun_br, lvm, agent_ports,
self.tun_br_ofports)
def fdb_remove(self, context, fdb_entries):
@ -376,11 +376,11 @@ class OFANeutronAgent(n_rpc.RpcCallback,
self.local_vlan_map):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
self.fdb_remove_tun(context, lvm, agent_ports,
self.fdb_remove_tun(context, self.tun_br, lvm, agent_ports,
self.tun_br_ofports)
def _add_fdb_flooding_flow(self, lvm):
datapath = self.tun_br.datapath
def _add_fdb_flooding_flow(self, br, lvm):
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
match = ofpp.OFPMatch(
@ -399,13 +399,13 @@ class OFANeutronAgent(n_rpc.RpcCallback,
match=match, instructions=instructions)
self.ryu_send_msg(msg)
def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
datapath = self.tun_br.datapath
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
if port_info == n_const.FLOODING_ENTRY:
lvm.tun_ofports.add(ofport)
self._add_fdb_flooding_flow(lvm)
self._add_fdb_flooding_flow(br, lvm)
else:
self.ryuapp.add_arp_table_entry(
lvm.vlan, port_info[1], port_info[0])
@ -425,14 +425,14 @@ class OFANeutronAgent(n_rpc.RpcCallback,
match=match, instructions=instructions)
self.ryu_send_msg(msg)
def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
datapath = self.tun_br.datapath
def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
if port_info == n_const.FLOODING_ENTRY:
lvm.tun_ofports.remove(ofport)
if len(lvm.tun_ofports) > 0:
self._add_fdb_flooding_flow(lvm)
self._add_fdb_flooding_flow(br, lvm)
else:
# This local vlan doesn't require any more tunelling
match = ofpp.OFPMatch(
@ -457,7 +457,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
match=match)
self.ryu_send_msg(msg)
def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
if action == 'add':
self.ryuapp.add_arp_table_entry(local_vid, ip_address, mac_address)
@ -466,8 +466,8 @@ class OFANeutronAgent(n_rpc.RpcCallback,
def _fdb_chg_ip(self, context, fdb_entries):
LOG.debug("update chg_ip received")
self.fdb_chg_ip_tun(
context, fdb_entries, self.local_ip, self.local_vlan_map)
self.fdb_chg_ip_tun(context, self.tun_br, fdb_entries, self.local_ip,
self.local_vlan_map)
def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,
segmentation_id):
@ -673,7 +673,8 @@ class OFANeutronAgent(n_rpc.RpcCallback,
self.ryu_send_msg(msg)
# Try to remove tunnel ports if not used by other networks
for ofport in lvm.tun_ofports:
self.cleanup_tunnel_port(ofport, lvm.network_type)
self.cleanup_tunnel_port(self.tun_br, ofport,
lvm.network_type)
elif lvm.network_type in (p_const.TYPE_FLAT, p_const.TYPE_VLAN):
if lvm.physical_network in self.phys_brs:
self._reclaim_local_vlan_outbound(lvm)
@ -1078,8 +1079,8 @@ class OFANeutronAgent(n_rpc.RpcCallback,
else:
LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
def _setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
ofport = self.tun_br.add_tunnel_port(port_name,
def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
ofport = br.add_tunnel_port(port_name,
remote_ip,
self.local_ip,
tunnel_type,
@ -1099,27 +1100,28 @@ class OFANeutronAgent(n_rpc.RpcCallback,
self.tun_br_ofports[tunnel_type][remote_ip] = ofport
# Add flow in default table to resubmit to the right
# tunelling table (lvid will be set in the latter)
match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))
instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(
match = br.ofparser.OFPMatch(in_port=int(ofport))
instructions = [br.ofparser.OFPInstructionGotoTable(
table_id=constants.TUN_TABLE[tunnel_type])]
msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
msg = br.ofparser.OFPFlowMod(br.datapath,
priority=1,
match=match,
instructions=instructions)
self.ryu_send_msg(msg)
return ofport
def setup_tunnel_port(self, remote_ip, network_type):
def setup_tunnel_port(self, br, remote_ip, network_type):
port_name = self._create_tunnel_port_name(network_type, remote_ip)
if not port_name:
return 0
ofport = self._setup_tunnel_port(port_name,
ofport = self._setup_tunnel_port(br,
port_name,
remote_ip,
network_type)
return ofport
def _remove_tunnel_port(self, tun_ofport, tunnel_type):
datapath = self.tun_br.datapath
def _remove_tunnel_port(self, br, tun_ofport, tunnel_type):
datapath = br.datapath
ofp = datapath.ofproto
ofpp = datapath.ofproto_parser
for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items():
@ -1127,7 +1129,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
port_name = self._create_tunnel_port_name(tunnel_type,
remote_ip)
if port_name:
self.tun_br.delete_port(port_name)
br.delete_port(port_name)
match = ofpp.OFPMatch(in_port=int(ofport))
msg = ofpp.OFPFlowMod(datapath,
command=ofp.OFPFC_DELETE,
@ -1137,14 +1139,14 @@ class OFANeutronAgent(n_rpc.RpcCallback,
self.ryu_send_msg(msg)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
# Check if this tunnel port is still used
for lvm in self.local_vlan_map.values():
if tun_ofport in lvm.tun_ofports:
break
# If not, remove it
else:
self._remove_tunnel_port(tun_ofport, tunnel_type)
self._remove_tunnel_port(br, tun_ofport, tunnel_type)
def treat_devices_added_or_updated(self, devices):
resync = False

View File

@ -332,7 +332,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
return
tun_name = '%s-%s' % (tunnel_type, tunnel_id)
if not self.l2_pop:
self._setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
tunnel_type)
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
@ -341,11 +342,12 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
self.tun_br.defer_apply_on()
self.fdb_add_tun(context, lvm, agent_ports,
self.tun_br_ofports)
if not self.enable_distributed_routing:
self.tun_br.defer_apply_off()
with self.tun_br.deferred() as deferred_br:
self.fdb_add_tun(context, deferred_br, lvm,
agent_ports, self.tun_br_ofports)
else:
self.fdb_add_tun(context, self.tun_br, lvm,
agent_ports, self.tun_br_ofports)
def fdb_remove(self, context, fdb_entries):
LOG.debug("fdb_remove received")
@ -354,59 +356,58 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
self.tun_br.defer_apply_on()
self.fdb_remove_tun(context, lvm, agent_ports,
self.tun_br_ofports)
if not self.enable_distributed_routing:
self.tun_br.defer_apply_off()
with self.tun_br.deferred() as deferred_br:
self.fdb_remove_tun(context, deferred_br, lvm,
agent_ports, self.tun_br_ofports)
else:
self.fdb_remove_tun(context, self.tun_br, lvm,
agent_ports, self.tun_br_ofports)
def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
if port_info == q_const.FLOODING_ENTRY:
lvm.tun_ofports.add(ofport)
ofports = ','.join(lvm.tun_ofports)
self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
br.mod_flow(table=constants.FLOOD_TO_TUN,
dl_vlan=lvm.vlan,
actions="strip_vlan,set_tunnel:%s,"
"output:%s" % (lvm.segmentation_id, ofports))
actions="strip_vlan,set_tunnel:%s,output:%s" %
(lvm.segmentation_id, ofports))
else:
self.setup_entry_for_arp_reply('add', lvm.vlan, port_info[0],
self.setup_entry_for_arp_reply(br, 'add', lvm.vlan, port_info[0],
port_info[1])
if not self.dvr_agent.is_dvr_router_interface(port_info[1]):
self.tun_br.add_flow(table=constants.UCAST_TO_TUN,
br.add_flow(table=constants.UCAST_TO_TUN,
priority=2,
dl_vlan=lvm.vlan,
dl_dst=port_info[0],
actions="strip_vlan,set_tunnel:%s,"
"output:%s" %
actions="strip_vlan,set_tunnel:%s,output:%s" %
(lvm.segmentation_id, ofport))
def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
if port_info == q_const.FLOODING_ENTRY:
lvm.tun_ofports.remove(ofport)
if len(lvm.tun_ofports) > 0:
ofports = ','.join(lvm.tun_ofports)
self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
br.mod_flow(table=constants.FLOOD_TO_TUN,
dl_vlan=lvm.vlan,
actions="strip_vlan,"
"set_tunnel:%s,output:%s" %
actions="strip_vlan,set_tunnel:%s,output:%s" %
(lvm.segmentation_id, ofports))
else:
# This local vlan doesn't require any more tunnelling
self.tun_br.delete_flows(table=constants.FLOOD_TO_TUN,
dl_vlan=lvm.vlan)
br.delete_flows(table=constants.FLOOD_TO_TUN, dl_vlan=lvm.vlan)
else:
self.setup_entry_for_arp_reply('remove', lvm.vlan, port_info[0],
port_info[1])
self.tun_br.delete_flows(table=constants.UCAST_TO_TUN,
self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan,
port_info[0], port_info[1])
br.delete_flows(table=constants.UCAST_TO_TUN,
dl_vlan=lvm.vlan,
dl_dst=port_info[0])
def _fdb_chg_ip(self, context, fdb_entries):
LOG.debug("update chg_ip received")
self.fdb_chg_ip_tun(
context, fdb_entries, self.local_ip, self.local_vlan_map)
with self.tun_br.deferred() as deferred_br:
self.fdb_chg_ip_tun(context, deferred_br, fdb_entries,
self.local_ip, self.local_vlan_map)
def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
'''Set the ARP respond entry.
@ -422,14 +423,14 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
if action == 'add':
actions = constants.ARP_RESPONDER_ACTIONS % {'mac': mac, 'ip': ip}
self.tun_br.add_flow(table=constants.ARP_RESPONDER,
br.add_flow(table=constants.ARP_RESPONDER,
priority=1,
proto='arp',
dl_vlan=local_vid,
nw_dst='%s' % ip,
actions=actions)
elif action == 'remove':
self.tun_br.delete_flows(table=constants.ARP_RESPONDER,
br.delete_flows(table=constants.ARP_RESPONDER,
proto='arp',
dl_vlan=local_vid,
nw_dst='%s' % ip)
@ -570,7 +571,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
if self.l2_pop:
# Try to remove tunnel ports if not used by other networks
for ofport in lvm.tun_ofports:
self.cleanup_tunnel_port(ofport, lvm.network_type)
self.cleanup_tunnel_port(self.tun_br, ofport,
lvm.network_type)
elif lvm.network_type == p_const.TYPE_FLAT:
if lvm.physical_network in self.phys_brs:
# outbound
@ -728,16 +730,16 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
ancillary_bridges.append(br)
return ancillary_bridges
def setup_tunnel_br(self, tun_br=None):
def setup_tunnel_br(self, tun_br_name=None):
'''Setup the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
using a patch port.
:param tun_br: the name of the tunnel bridge.
:param tun_br_name: the name of the tunnel bridge.
'''
if not self.tun_br:
self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
self.tun_br = ovs_lib.OVSBridge(tun_br_name, self.root_helper)
self.tun_br.reset_bridge()
self.patch_tun_ofport = self.int_br.add_patch_port(
@ -1011,8 +1013,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
else:
LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
def _setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
ofport = self.tun_br.add_tunnel_port(port_name,
def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
ofport = br.add_tunnel_port(port_name,
remote_ip,
self.local_ip,
tunnel_type,
@ -1032,7 +1034,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
self.tun_br_ofports[tunnel_type][remote_ip] = ofport
# Add flow in default table to resubmit to the right
# tunnelling table (lvid will be set in the latter)
self.tun_br.add_flow(priority=1,
br.add_flow(priority=1,
in_port=ofport,
actions="resubmit(,%s)" %
constants.TUN_TABLE[tunnel_type])
@ -1042,25 +1044,24 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
# Update flooding flows to include the new tunnel
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
if vlan_mapping.network_type == tunnel_type:
self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
br.mod_flow(table=constants.FLOOD_TO_TUN,
dl_vlan=vlan_mapping.vlan,
actions="strip_vlan,"
"set_tunnel:%s,output:%s" %
(vlan_mapping.segmentation_id,
ofports))
actions="strip_vlan,set_tunnel:%s,output:%s" %
(vlan_mapping.segmentation_id, ofports))
return ofport
def setup_tunnel_port(self, remote_ip, network_type):
def setup_tunnel_port(self, br, remote_ip, network_type):
remote_ip_hex = self.get_ip_in_hex(remote_ip)
if not remote_ip_hex:
return 0
port_name = '%s-%s' % (network_type, remote_ip_hex)
ofport = self._setup_tunnel_port(port_name,
ofport = self._setup_tunnel_port(br,
port_name,
remote_ip,
network_type)
return ofport
def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
# Check if this tunnel port is still used
for lvm in self.local_vlan_map.values():
if tun_ofport in lvm.tun_ofports:
@ -1071,8 +1072,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
if ofport == tun_ofport:
port_name = '%s-%s' % (tunnel_type,
self.get_ip_in_hex(remote_ip))
self.tun_br.delete_port(port_name)
self.tun_br.delete_flows(in_port=ofport)
br.delete_port(port_name)
br.delete_flows(in_port=ofport)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices, ovs_restarted):
@ -1283,7 +1284,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
return
def tunnel_sync(self):
resync = False
try:
for tunnel_type in self.tunnel_types:
details = self.plugin_rpc.tunnel_sync(self.context,
@ -1303,13 +1303,15 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
continue
tun_name = '%s-%s' % (tunnel_type,
tunnel_id or remote_ip_hex)
self._setup_tunnel_port(
tun_name, tunnel['ip_address'], tunnel_type)
self._setup_tunnel_port(self.tun_br,
tun_name,
tunnel['ip_address'],
tunnel_type)
except Exception as e:
LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
{'local_ip': self.local_ip, 'e': e})
resync = True
return resync
return True
return False
def _agent_has_updates(self, polling_manager):
return (polling_manager.is_polling_required or

View File

@ -14,6 +14,7 @@
# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
import collections
import mock
from neutron.agent import l2population_rpc
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
@ -28,19 +29,19 @@ class FakeNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin):
def fdb_remove(self, context, fdb_entries):
pass
def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
pass
def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
pass
def setup_tunnel_port(self, remote_ip, network_type):
def setup_tunnel_port(self, br, remote_ip, network_type):
pass
def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
pass
def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
pass
@ -50,6 +51,7 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
def setUp(self):
super(TestL2populationRpcCallBackTunnelMixinBase, self).setUp()
self.fakeagent = FakeNeutronAgent()
self.fakebr = mock.Mock()
Port = collections.namedtuple('Port', 'ip, ofport')
LVM = collections.namedtuple(
'LVM', 'net, vlan, phys, segid, mac, ip, vif, port')

View File

@ -260,39 +260,38 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.add_flow(**flow_dict_6)
self.br.add_flow(**flow_dict_7)
expected_calls = [
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,"
"priority=2,dl_src=ca:fe:de:ad:be:ef"
",actions=strip_vlan,output:0"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=1,actions=normal"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,actions=drop"],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,in_port=%s,actions=drop" % ofport],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
",actions=strip_vlan,output:0",
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,"
"priority=1,actions=normal",
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,"
"priority=2,actions=drop",
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,priority=2,"
"in_port=%s,actions=drop" % ofport,
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,"
"priority=4,dl_vlan=%s,in_port=%s,"
"actions=strip_vlan,set_tunnel:%s,normal"
% (vid, ofport, lsw_id)],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=3,tun_id=%s,actions="
"mod_vlan_vid:%s,output:%s"
% (lsw_id, vid, ofport)],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=4,nw_src=%s,arp,actions=drop" % cidr],
process_input=None, root_helper=self.root_helper),
% (vid, ofport, lsw_id),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,priority=3,"
"tun_id=%s,actions=mod_vlan_vid:%s,"
"output:%s" % (lsw_id, vid, ofport),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,priority=4,"
"nw_src=%s,arp,actions=drop" % cidr,
root_helper=self.root_helper),
]
self.execute.assert_has_calls(expected_calls)
@ -304,9 +303,9 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.add_flow(**flow_dict)
self.execute.assert_called_once_with(
["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=1000,idle_timeout=2000,priority=1,actions=normal"],
process_input=None,
["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=1000,idle_timeout=2000,priority=1,"
"actions=normal",
root_helper=self.root_helper)
def test_add_flow_default_priority(self):
@ -314,9 +313,9 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.add_flow(**flow_dict)
self.execute.assert_called_once_with(
["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,priority=1,actions=normal"],
process_input=None,
["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
process_input="hard_timeout=0,idle_timeout=0,priority=1,"
"actions=normal",
root_helper=self.root_helper)
def _test_get_port_ofport(self, ofport, expected_result):
@ -362,15 +361,15 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.delete_flows(tun_id=lsw_id)
self.br.delete_flows(dl_vlan=vid)
expected_calls = [
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"in_port=" + ofport],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"tun_id=%s" % lsw_id],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
"dl_vlan=%s" % vid],
process_input=None, root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
process_input="in_port=" + ofport,
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
process_input="tun_id=%s" % lsw_id,
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
process_input="dl_vlan=%s" % vid,
root_helper=self.root_helper),
]
self.execute.assert_has_calls(expected_calls)
@ -425,75 +424,6 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.mod_flow,
**params)
def test_defer_apply_flows(self):
flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
flow_expr.side_effect = ['added_flow_1', 'added_flow_2',
'deleted_flow_1']
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
self.br.defer_apply_on()
self.br.add_flow(flow='add_flow_1')
self.br.defer_apply_on()
self.br.add_flow(flow='add_flow_2')
self.br.delete_flows(flow='delete_flow_1')
self.br.defer_apply_off()
flow_expr.assert_has_calls([
mock.call({'flow': 'add_flow_1'}, 'add'),
mock.call({'flow': 'add_flow_2'}, 'add'),
mock.call({'flow': 'delete_flow_1'}, 'del')
])
run_ofctl.assert_has_calls([
mock.call('add-flows', ['-'], 'added_flow_1\nadded_flow_2\n'),
mock.call('del-flows', ['-'], 'deleted_flow_1\n')
])
def test_defer_apply_flows_concurrently(self):
flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
flow_expr.side_effect = ['added_flow_1', 'deleted_flow_1',
'modified_flow_1', 'added_flow_2',
'deleted_flow_2', 'modified_flow_2']
run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
def run_ofctl_fake(cmd, args, process_input=None):
self.br.defer_apply_on()
if cmd == 'add-flows':
self.br.add_flow(flow='added_flow_2')
elif cmd == 'del-flows':
self.br.delete_flows(flow='deleted_flow_2')
elif cmd == 'mod-flows':
self.br.mod_flow(flow='modified_flow_2')
run_ofctl.side_effect = run_ofctl_fake
self.br.defer_apply_on()
self.br.add_flow(flow='added_flow_1')
self.br.delete_flows(flow='deleted_flow_1')
self.br.mod_flow(flow='modified_flow_1')
self.br.defer_apply_off()
run_ofctl.side_effect = None
self.br.defer_apply_off()
flow_expr.assert_has_calls([
mock.call({'flow': 'added_flow_1'}, 'add'),
mock.call({'flow': 'deleted_flow_1'}, 'del'),
mock.call({'flow': 'modified_flow_1'}, 'mod'),
mock.call({'flow': 'added_flow_2'}, 'add'),
mock.call({'flow': 'deleted_flow_2'}, 'del'),
mock.call({'flow': 'modified_flow_2'}, 'mod')
])
run_ofctl.assert_has_calls([
mock.call('add-flows', ['-'], 'added_flow_1\n'),
mock.call('del-flows', ['-'], 'deleted_flow_1\n'),
mock.call('mod-flows', ['-'], 'modified_flow_1\n'),
mock.call('add-flows', ['-'], 'added_flow_2\n'),
mock.call('del-flows', ['-'], 'deleted_flow_2\n'),
mock.call('mod-flows', ['-'], 'modified_flow_2\n')
])
def test_add_tunnel_port(self):
pname = "tap99"
local_ip = "1.1.1.1"
@ -932,3 +862,111 @@ class OVS_Lib_Test(base.BaseTestCase):
data = [[["map", external_ids], "tap99", 1]]
self.assertIsNone(self._test_get_vif_port_by_id('tap99id', data,
"br-ext"))
class TestDeferredOVSBridge(base.BaseTestCase):
def setUp(self):
super(TestDeferredOVSBridge, self).setUp()
self.br = mock.Mock()
self.mocked_do_action_flows = mock.patch.object(
self.br, 'do_action_flows').start()
self.add_flow_dict1 = dict(in_port=11, actions='drop')
self.add_flow_dict2 = dict(in_port=12, actions='drop')
self.mod_flow_dict1 = dict(in_port=21, actions='drop')
self.mod_flow_dict2 = dict(in_port=22, actions='drop')
self.del_flow_dict1 = dict(in_port=31)
self.del_flow_dict2 = dict(in_port=32)
def _verify_mock_call(self, expected_calls):
self.mocked_do_action_flows.assert_has_calls(expected_calls)
self.assertEqual(len(expected_calls),
len(self.mocked_do_action_flows.mock_calls))
def test_apply_on_exit(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1]),
mock.call('mod', [self.mod_flow_dict1]),
mock.call('del', [self.del_flow_dict1]),
]
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
self._verify_mock_call([])
self._verify_mock_call(expected_calls)
def test_apply_on_exit_with_errors(self):
try:
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
raise Exception
except Exception:
self._verify_mock_call([])
else:
self.fail('Exception would be reraised')
def test_apply(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1]),
mock.call('mod', [self.mod_flow_dict1]),
mock.call('del', [self.del_flow_dict1]),
]
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
self._verify_mock_call([])
deferred_br.apply_flows()
self._verify_mock_call(expected_calls)
self._verify_mock_call(expected_calls)
def test_apply_order(self):
expected_calls = [
mock.call('del', [self.del_flow_dict1, self.del_flow_dict2]),
mock.call('mod', [self.mod_flow_dict1, self.mod_flow_dict2]),
mock.call('add', [self.add_flow_dict1, self.add_flow_dict2]),
]
order = 'del', 'mod', 'add'
with ovs_lib.DeferredOVSBridge(self.br, order=order) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict2)
deferred_br.add_flow(**self.add_flow_dict2)
deferred_br.mod_flow(**self.mod_flow_dict2)
self._verify_mock_call(expected_calls)
def test_apply_full_ordered(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1]),
mock.call('mod', [self.mod_flow_dict1]),
mock.call('del', [self.del_flow_dict1, self.del_flow_dict2]),
mock.call('add', [self.add_flow_dict2]),
mock.call('mod', [self.mod_flow_dict2]),
]
with ovs_lib.DeferredOVSBridge(self.br,
full_ordered=True) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict2)
deferred_br.add_flow(**self.add_flow_dict2)
deferred_br.mod_flow(**self.mod_flow_dict2)
self._verify_mock_call(expected_calls)
def test_getattr_unallowed_attr(self):
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
self.assertEqual(self.br.add_port, deferred_br.add_port)
def test_getattr_unallowed_attr(self):
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
self.assertRaises(AttributeError, getattr, deferred_br, 'failure')

View File

@ -69,15 +69,15 @@ class TestL2populationRpcCallBackTunnelMixin(
mock.patch.object(self.fakeagent, 'setup_tunnel_port'),
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
self.fakeagent.fdb_add_tun('context', self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
self.lvm1, self.ports[1].ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -90,17 +90,17 @@ class TestL2populationRpcCallBackTunnelMixin(
return_value=ofport),
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
self.fakeagent.fdb_add_tun('context', self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
mock_setup_tunnel_port.assert_called_once_with(
self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
self.lvm1, ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
self.ports[1].ip, self.lvm1, ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -112,15 +112,15 @@ class TestL2populationRpcCallBackTunnelMixin(
return_value=0),
mock.patch.object(self.fakeagent, 'add_fdb_flow'),
) as (mock_setup_tunnel_port, mock_add_fdb_flow):
self.fakeagent.fdb_add_tun('context', self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
mock_setup_tunnel_port.assert_called_once_with(
self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -128,15 +128,15 @@ class TestL2populationRpcCallBackTunnelMixin(
def test_fdb_remove_tun(self):
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
self.lvm1, self.ports[1].ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -147,32 +147,33 @@ class TestL2populationRpcCallBackTunnelMixin(
mock.patch.object(self.fakeagent, 'del_fdb_flow'),
mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'),
) as (mock_del_fdb_flow, mock_cleanup_tunnel_port):
self.fakeagent.fdb_remove_tun('context', self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]],
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr,
[n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]],
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
mock_cleanup_tunnel_port.assert_called_once_with(
self.ports[1].ofport, self.lvm1.network_type)
self.fakebr, self.ports[1].ofport, self.lvm1.network_type)
def test_fdb_remove_tun_non_existence_key_in_ofports(self):
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.agent_ports, self.ofports)
expected = [
mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
self.lvm1, self.ports[0].ofport),
mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
self.lvm1, self.ports[2].ofport),
mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -192,20 +193,21 @@ class TestL2populationRpcCallBackTunnelMixin(
def test__fdb_chg_ip(self):
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', self.upd_fdb_entry1_val,
self.local_ip, self.local_vlan_map1)
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
self.upd_fdb_entry1_val, self.local_ip,
self.local_vlan_map1)
expected = [
mock.call('remove', self.lvm1.vlan, self.lvms[0].mac,
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call('add', self.lvm1.vlan, self.lvms[1].mac,
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call('remove', self.lvm1.vlan, self.lvms[0].mac,
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call('add', self.lvm1.vlan, self.lvms[1].mac,
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call('remove', self.lvm2.vlan, self.lvms[0].mac,
mock.call(self.fakebr, 'remove', self.lvm2.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call('add', self.lvm2.vlan, self.lvms[2].mac,
mock.call(self.fakebr, 'add', self.lvm2.vlan, self.lvms[2].mac,
self.lvms[2].ip),
]
m_setup_entry_for_arp_reply.assert_has_calls(expected, any_order=True)
@ -214,7 +216,7 @@ class TestL2populationRpcCallBackTunnelMixin(
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun(
'context', self.upd_fdb_entry1, self.local_ip, {})
'context', self.fakebr, self.upd_fdb_entry1, self.local_ip, {})
self.assertFalse(m_setup_entry_for_arp_reply.call_count)
def test__fdb_chg_ip_ip_is_local_ip(self):
@ -228,6 +230,7 @@ class TestL2populationRpcCallBackTunnelMixin(
}
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', upd_fdb_entry_val,
self.local_ip, self.local_vlan_map1)
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
upd_fdb_entry_val, self.local_ip,
self.local_vlan_map1)
self.assertFalse(m_setup_entry_for_arp_reply.call_count)

View File

@ -736,7 +736,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
fdb_entry[self.lvms[0].net]['ports'][tunnel_ip] = [['mac', 'ip']]
self.agent.fdb_add(None, fdb_entry)
add_tun_fn.assert_called_with(
tun_name, tunnel_ip, self.tunnel_type)
self.agent.tun_br, tun_name, tunnel_ip, self.tunnel_type)
def test_fdb_del_port(self):
self._prepare_l2_pop_ofports()
@ -831,7 +831,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
mock.patch.object(self.mod_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
'gre-1', 'remote_ip', p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
@ -848,7 +848,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
mock.patch.object(self.mod_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
'gre-1', 'remote_ip', p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)

View File

@ -931,7 +931,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
return_value='6'),
mock.patch.object(self.agent.tun_br, "add_flow")
) as (add_tun_port_fn, add_flow_fn):
self.agent._setup_tunnel_port('portname', '1.2.3.4', 'vxlan')
self.agent._setup_tunnel_port(self.agent.tun_br, 'portname',
'1.2.3.4', 'vxlan')
self.assertTrue(add_tun_port_fn.called)
def test_port_unbound(self):
@ -996,7 +997,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
[[FAKE_MAC, FAKE_IP1],
n_const.FLOODING_ENTRY]}}}
with mock.patch.object(self.agent.tun_br,
"defer_apply_on") as defer_fn:
"deferred") as defer_fn:
self.agent.fdb_add(None, fdb_entry)
self.assertFalse(defer_fn.called)
@ -1013,33 +1014,36 @@ class TestOvsNeutronAgent(base.BaseTestCase):
[[FAKE_MAC, FAKE_IP1],
n_const.FLOODING_ENTRY]}}}
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_flow'),
mock.patch.object(self.agent.tun_br, 'mod_flow'),
mock.patch.object(self.agent.tun_br, 'deferred'),
mock.patch.object(self.agent.tun_br, 'do_action_flows'),
mock.patch.object(self.agent, '_setup_tunnel_port'),
) as (add_flow_fn, mod_flow_fn, add_tun_fn):
) as (deferred_fn, do_action_flows_fn, add_tun_fn):
deferred_fn.return_value = ovs_lib.DeferredOVSBridge(
self.agent.tun_br)
self.agent.fdb_add(None, fdb_entry)
self.assertFalse(add_tun_fn.called)
actions = (constants.ARP_RESPONDER_ACTIONS %
{'mac': netaddr.EUI(FAKE_MAC, dialect=netaddr.mac_unix),
'ip': netaddr.IPAddress(FAKE_IP1)})
add_flow_fn.assert_has_calls([
mock.call(table=constants.ARP_RESPONDER,
expected_calls = [
mock.call('add', [dict(table=constants.ARP_RESPONDER,
priority=1,
proto='arp',
dl_vlan='vlan1',
nw_dst=FAKE_IP1,
actions=actions),
mock.call(table=constants.UCAST_TO_TUN,
dict(table=constants.UCAST_TO_TUN,
priority=2,
dl_vlan='vlan1',
dl_dst=FAKE_MAC,
actions='strip_vlan,'
'set_tunnel:seg1,output:2')
])
mod_flow_fn.assert_called_with(table=constants.FLOOD_TO_TUN,
'set_tunnel:seg1,output:2')]),
mock.call('mod', [dict(table=constants.FLOOD_TO_TUN,
dl_vlan='vlan1',
actions='strip_vlan,'
'set_tunnel:seg1,output:1,2')
'set_tunnel:seg1,output:1,2')]),
]
do_action_flows_fn.assert_has_calls(expected_calls)
def test_fdb_del_flows(self):
self._prepare_l2_pop_ofports()
@ -1051,23 +1055,27 @@ class TestOvsNeutronAgent(base.BaseTestCase):
[[FAKE_MAC, FAKE_IP1],
n_const.FLOODING_ENTRY]}}}
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'mod_flow'),
mock.patch.object(self.agent.tun_br, 'delete_flows'),
) as (mod_flow_fn, del_flow_fn):
mock.patch.object(self.agent.tun_br, 'deferred'),
mock.patch.object(self.agent.tun_br, 'do_action_flows'),
) as (deferred_fn, do_action_flows_fn):
deferred_fn.return_value = ovs_lib.DeferredOVSBridge(
self.agent.tun_br)
self.agent.fdb_remove(None, fdb_entry)
mod_flow_fn.assert_called_with(table=constants.FLOOD_TO_TUN,
expected_calls = [
mock.call('mod', [dict(table=constants.FLOOD_TO_TUN,
dl_vlan='vlan2',
actions='strip_vlan,'
'set_tunnel:seg2,output:1')
expected = [mock.call(table=constants.ARP_RESPONDER,
'set_tunnel:seg2,output:1')]),
mock.call('del', [dict(table=constants.ARP_RESPONDER,
proto='arp',
dl_vlan='vlan2',
nw_dst=FAKE_IP1),
mock.call(table=constants.UCAST_TO_TUN,
dict(table=constants.UCAST_TO_TUN,
dl_vlan='vlan2',
dl_dst=FAKE_MAC),
mock.call(in_port='2')]
del_flow_fn.assert_has_calls(expected)
dict(in_port='2')]),
]
do_action_flows_fn.assert_has_calls(expected_calls)
def test_fdb_add_port(self):
self._prepare_l2_pop_ofports()
@ -1076,15 +1084,18 @@ class TestOvsNeutronAgent(base.BaseTestCase):
'segment_id': 'tun1',
'ports': {'1.1.1.1': [[FAKE_MAC, FAKE_IP1]]}}}
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_flow'),
mock.patch.object(self.agent.tun_br, 'mod_flow'),
mock.patch.object(self.agent.tun_br, 'deferred'),
mock.patch.object(self.agent.tun_br, 'do_action_flows'),
mock.patch.object(self.agent, '_setup_tunnel_port')
) as (add_flow_fn, mod_flow_fn, add_tun_fn):
) as (deferred_fn, do_action_flows_fn, add_tun_fn):
deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
deferred_fn.return_value = deferred_br
self.agent.fdb_add(None, fdb_entry)
self.assertFalse(add_tun_fn.called)
fdb_entry['net1']['ports']['10.10.10.10'] = [[FAKE_MAC, FAKE_IP1]]
self.agent.fdb_add(None, fdb_entry)
add_tun_fn.assert_called_with('gre-0a0a0a0a', '10.10.10.10', 'gre')
add_tun_fn.assert_called_with(
deferred_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')
def test_fdb_del_port(self):
self._prepare_l2_pop_ofports()
@ -1093,11 +1104,14 @@ class TestOvsNeutronAgent(base.BaseTestCase):
'segment_id': 'tun2',
'ports': {'2.2.2.2': [n_const.FLOODING_ENTRY]}}}
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'delete_flows'),
mock.patch.object(self.agent.tun_br, 'deferred'),
mock.patch.object(self.agent.tun_br, 'do_action_flows'),
mock.patch.object(self.agent.tun_br, 'delete_port')
) as (del_flow_fn, del_port_fn):
) as (deferred_fn, do_action_flows_fn, delete_port_fn):
deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
deferred_fn.return_value = deferred_br
self.agent.fdb_remove(None, fdb_entry)
del_port_fn.assert_called_once_with('gre-02020202')
delete_port_fn.assert_called_once_with('gre-02020202')
def test_fdb_update_chg_ip(self):
self._prepare_l2_pop_ofports()
@ -1107,23 +1121,30 @@ class TestOvsNeutronAgent(base.BaseTestCase):
{'before': [[FAKE_MAC, FAKE_IP1]],
'after': [[FAKE_MAC, FAKE_IP2]]}}}}
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_flow'),
mock.patch.object(self.agent.tun_br, 'delete_flows')
) as (add_flow_fn, del_flow_fn):
mock.patch.object(self.agent.tun_br, 'deferred'),
mock.patch.object(self.agent.tun_br, 'do_action_flows'),
) as (deferred_fn, do_action_flows_fn):
deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
deferred_fn.return_value = deferred_br
self.agent.fdb_update(None, fdb_entries)
actions = (constants.ARP_RESPONDER_ACTIONS %
{'mac': netaddr.EUI(FAKE_MAC, dialect=netaddr.mac_unix),
'ip': netaddr.IPAddress(FAKE_IP2)})
add_flow_fn.assert_called_once_with(table=constants.ARP_RESPONDER,
expected_calls = [
mock.call('add', [dict(table=constants.ARP_RESPONDER,
priority=1,
proto='arp',
dl_vlan='vlan1',
nw_dst=FAKE_IP2,
actions=actions)
del_flow_fn.assert_called_once_with(table=constants.ARP_RESPONDER,
actions=actions)]),
mock.call('del', [dict(table=constants.ARP_RESPONDER,
proto='arp',
dl_vlan='vlan1',
nw_dst=FAKE_IP1)
nw_dst=FAKE_IP1)])
]
do_action_flows_fn.assert_has_calls(expected_calls)
self.assertEqual(len(expected_calls),
len(do_action_flows_fn.mock_calls))
def test_recl_lv_port_to_preserve(self):
self._prepare_l2_pop_ofports()
@ -1191,7 +1212,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
'gre-1', 'remote_ip', p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
@ -1208,7 +1229,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
'gre-1', 'remote_ip', p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
@ -1228,7 +1249,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
) as (add_tunnel_port_fn, log_error_fn):
self.agent.dont_fragment = False
ofport = self.agent._setup_tunnel_port(
'gre-1', 'remote_ip', p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
@ -1247,7 +1268,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
self.agent.tunnel_types = ['gre']
self.agent.tunnel_sync()
expected_calls = [mock.call('gre-42', '100.101.102.103', 'gre')]
expected_calls = [mock.call(self.agent.tun_br, 'gre-42',
'100.101.102.103', 'gre')]
_setup_tunnel_port_fn.assert_has_calls(expected_calls)
def test_tunnel_sync_with_ml2_plugin(self):
@ -1259,7 +1281,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
expected_calls = [mock.call('vxlan-64651f0f',
expected_calls = [mock.call(self.agent.tun_br, 'vxlan-64651f0f',
'100.101.31.15', 'vxlan')]
_setup_tunnel_port_fn.assert_has_calls(expected_calls)
@ -1273,7 +1295,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
_setup_tunnel_port_fn.assert_called_once_with('vxlan-64646464',
_setup_tunnel_port_fn.assert_called_once_with(self.agent.tun_br,
'vxlan-64646464',
'100.100.100.100',
'vxlan')
@ -1285,7 +1308,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
self.agent.tunnel_types = ['gre']
self.agent.l2_pop = False
self.agent.tunnel_update(context=None, **kwargs)
expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
expected_calls = [
mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
def test_ovs_restart(self):