# Copyright (C) 2014 VA Linux Systems Japan K.K. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. import collections import contextlib import mock from neutron.openstack.common import importutils from neutron.tests.unit.ofagent import ofa_test_base _OFALIB_NAME = 'neutron.plugins.ofagent.agent.arp_lib' class OFAAgentTestCase(ofa_test_base.OFAAgentTestBase): def setUp(self): super(OFAAgentTestCase, self).setUp() Net = collections.namedtuple('Net', 'net, mac, ip') self.nets = [Net(net=10, mac='11:11:11:44:55:66', ip='10.1.2.20'), Net(net=10, mac='11:11:11:44:55:67', ip='10.1.2.21'), Net(net=20, mac='22:22:22:44:55:66', ip='10.2.2.20')] self.packet_mod = mock.Mock() self.proto_ethernet_mod = mock.Mock() self.proto_vlan_mod = mock.Mock() self.proto_vlan_mod.vid = self.nets[0].net self.proto_arp_mod = mock.Mock() self.fake_get_protocol = mock.Mock(return_value=self.proto_vlan_mod) self.packet_mod.get_protocol = self.fake_get_protocol self.fake_add_protocol = mock.Mock() self.packet_mod.add_protocol = self.fake_add_protocol self.arp = importutils.import_module('ryu.lib.packet.arp') self.ethernet = importutils.import_module('ryu.lib.packet.ethernet') self.vlan = importutils.import_module('ryu.lib.packet.vlan') mock.patch('ryu.lib.packet.packet.Packet', return_value=self.packet_mod).start() self.ryuapp = 'ryuapp' self.inport = '1' self.ev = mock.Mock() self.datapath = self._mk_test_dp('tun_br') self.ofproto = importutils.import_module('ryu.ofproto.ofproto_v1_3') self.ofpp = mock.Mock() self.datapath.ofproto = self.ofproto self.datapath.ofproto_parser = self.ofpp self.OFPActionOutput = mock.Mock() self.OFPActionOutput.return_value = 'OFPActionOutput' self.ofpp.OFPActionOutput = self.OFPActionOutput self.msg = mock.Mock() self.msg.datapath = self.datapath self.msg.buffer_id = self.ofproto.OFP_NO_BUFFER self.msg_data = 'test_message_data' self.msg.data = self.msg_data self.ev.msg = self.msg self.msg.match = {'in_port': self.inport} class TestArpLib(OFAAgentTestCase): def setUp(self): super(TestArpLib, self).setUp() self.mod_arplib = importutils.import_module(_OFALIB_NAME) self.arplib = self.mod_arplib.ArpLib(self.ryuapp) self.packet_mod.get_protocol = self._fake_get_protocol self._fake_get_protocol_ethernet = True self._fake_get_protocol_vlan = True self._fake_get_protocol_arp = True def test__send_unknown_packet_no_buffer(self): in_port = 3 out_port = self.ofproto.OFPP_TABLE self.msg.buffer_id = self.ofproto.OFP_NO_BUFFER self.arplib._send_unknown_packet(self.msg, in_port, out_port) actions = [self.ofpp.OFPActionOutput(self.ofproto.OFPP_TABLE, 0)] self.ofpp.OFPPacketOut.assert_called_once_with( datapath=self.datapath, buffer_id=self.msg.buffer_id, in_port=in_port, actions=actions, data=self.msg_data) def test__send_unknown_packet_existence_buffer(self): in_port = 3 out_port = self.ofproto.OFPP_TABLE self.msg.buffer_id = 256 self.arplib._send_unknown_packet(self.msg, in_port, out_port) actions = [self.ofpp.OFPActionOutput(self.ofproto.OFPP_TABLE, 0)] self.ofpp.OFPPacketOut.assert_called_once_with( datapath=self.datapath, buffer_id=self.msg.buffer_id, in_port=in_port, actions=actions, data=None) def test__respond_arp(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} port = 3 arptbl = self.arplib._arp_tbl[self.nets[0].net] pkt_ethernet = self.ethernet pkt_vlan = self.vlan pkt_arp = self.arp pkt_arp.opcode = self.arp.ARP_REQUEST pkt_arp.dst_ip = self.nets[0].ip with mock.patch.object( self.arplib, '_send_arp_reply' ) as send_arp_rep_fn: self.assertTrue( self.arplib._respond_arp(self.datapath, port, arptbl, pkt_ethernet, pkt_vlan, pkt_arp)) ethernet_ethernet = self.ethernet.ethernet( ethertype=pkt_ethernet.ethertype, dst=pkt_ethernet.src, src=self.nets[0].mac) vlan_vlan = self.vlan.vlan(cfi=pkt_vlan.cfi, ethertype=pkt_vlan.ethertype, pcp=pkt_vlan.pcp, vid=pkt_vlan.vid) arp_arp = self.arp.arp(opcode=self.arp.ARP_REPLY, src_mac=self.nets[0].mac, src_ip=pkt_arp.dst_ip, dst_mac=pkt_arp.src_mac, dst_ip=pkt_arp.src_ip) self.fake_add_protocol.assert_has_calls([mock.call(ethernet_ethernet), mock.call(vlan_vlan), mock.call(arp_arp)]) send_arp_rep_fn.assert_called_once_with( self.datapath, port, self.packet_mod) def _test__respond_arp(self, pkt_arp): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} port = 3 arptbl = self.arplib._arp_tbl[self.nets[0].net] pkt_ethernet = mock.Mock() pkt_vlan = mock.Mock() self.assertFalse( self.arplib._respond_arp(self.datapath, port, arptbl, pkt_ethernet, pkt_vlan, pkt_arp)) def test__respond_arp_non_arp_req(self): pkt_arp = mock.Mock() pkt_arp.opcode = self.arp.ARP_REPLY self._test__respond_arp(pkt_arp) def test__respond_arp_ip_not_found_in_arptable(self): pkt_arp = mock.Mock() pkt_arp.opcode = self.arp.ARP_REQUEST pkt_arp.dst_ip = self.nets[1].ip self._test__respond_arp(pkt_arp) def test_add_arp_table_entry(self): self.arplib.add_arp_table_entry(self.nets[0].net, self.nets[0].ip, self.nets[0].mac) self.assertEqual( self.arplib._arp_tbl, {self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}}) def test_add_arp_table_entry_multiple_net(self): self.arplib.add_arp_table_entry(self.nets[0].net, self.nets[0].ip, self.nets[0].mac) self.arplib.add_arp_table_entry(self.nets[2].net, self.nets[2].ip, self.nets[2].mac) self.assertEqual( self.arplib._arp_tbl, {self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}, self.nets[2].net: {self.nets[2].ip: self.nets[2].mac}}) def test_add_arp_table_entry_multiple_ip(self): self.arplib.add_arp_table_entry(self.nets[0].net, self.nets[0].ip, self.nets[0].mac) self.arplib.add_arp_table_entry(self.nets[0].net, self.nets[1].ip, self.nets[1].mac) self.assertEqual( self.arplib._arp_tbl, {self.nets[0].net: {self.nets[0].ip: self.nets[0].mac, self.nets[1].ip: self.nets[1].mac}}) def test_del_arp_table_entry(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} self.arplib.del_arp_table_entry(self.nets[0].net, self.nets[0].ip) self.assertEqual(self.arplib._arp_tbl, {}) def test_del_arp_table_entry_multiple_net(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}, self.nets[2].net: {self.nets[2].ip: self.nets[2].mac}} self.arplib.del_arp_table_entry(self.nets[0].net, self.nets[0].ip) self.assertEqual( self.arplib._arp_tbl, {self.nets[2].net: {self.nets[2].ip: self.nets[2].mac}}) def test_del_arp_table_entry_multiple_ip(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac, self.nets[1].ip: self.nets[1].mac}} self.arplib.del_arp_table_entry(self.nets[0].net, self.nets[1].ip) self.assertEqual( self.arplib._arp_tbl, {self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}}) def _fake_get_protocol(self, net_type): if net_type == self.ethernet.ethernet: if self._fake_get_protocol_ethernet: return self.proto_ethernet_mod else: return if net_type == self.vlan.vlan: if self._fake_get_protocol_vlan: return self.proto_vlan_mod else: return if net_type == self.arp.arp: if self._fake_get_protocol_arp: return self.proto_arp_mod else: return def test_packet_in_handler(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} with contextlib.nested( mock.patch.object(self.arplib, '_respond_arp', return_value=True), mock.patch.object(self.arplib, '_add_flow_to_avoid_unknown_packet'), mock.patch.object(self.arplib, '_send_unknown_packet'), ) as (res_arp_fn, add_flow_fn, send_unknown_pk_fn): self.arplib.packet_in_handler(self.ev) self.assertFalse(add_flow_fn.call_count) self.assertFalse(send_unknown_pk_fn.call_count) res_arp_fn.assert_called_once_with( self.datapath, self.inport, self.arplib._arp_tbl[self.nets[0].net], self.proto_ethernet_mod, self.proto_vlan_mod, self.proto_arp_mod) def _test_packet_in_handler(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} with contextlib.nested( mock.patch.object(self.arplib, '_respond_arp', return_value=True), mock.patch.object(self.arplib, '_add_flow_to_avoid_unknown_packet'), mock.patch.object(self.arplib, '_send_unknown_packet'), ) as (res_arp_fn, add_flow_fn, send_unknown_pk_fn): self.arplib.packet_in_handler(self.ev) self.assertFalse(add_flow_fn.call_count) self.assertFalse(send_unknown_pk_fn.call_count) self.assertFalse(res_arp_fn.call_count) def test_packet_in_handler_non_ethernet(self): self._fake_get_protocol_ethernet = False self._test_packet_in_handler() def test_packet_in_handler_non_vlan(self): self._fake_get_protocol_vlan = False self._test_packet_in_handler() def test_packet_in_handler_non_arp(self): self._fake_get_protocol_arp = False self._test_packet_in_handler() def test_packet_in_handler_unknown_network(self): self.arplib._arp_tbl = { self.nets[0].net: {self.nets[0].ip: self.nets[0].mac}} with contextlib.nested( mock.patch.object(self.arplib, '_respond_arp', return_value=False), mock.patch.object(self.arplib, '_add_flow_to_avoid_unknown_packet'), mock.patch.object(self.arplib, '_send_unknown_packet'), ) as (res_arp_fn, add_flow_fn, send_unknown_pk_fn): self.arplib.packet_in_handler(self.ev) add_flow_fn.assert_called_once_with( self.datapath, self.datapath.ofproto_parser.OFPMatch( eth_type=self.ethernet.ETH_TYPE_ARP, vlan_vid=self.proto_vlan_mod.vid | self.datapath.ofproto.OFPVID_PRESENT, arp_op=self.arp.ARP_REQUEST, arp_tpa=self.proto_arp_mod.dst_ip)) send_unknown_pk_fn.assert_called_once_with( self.ev.msg, self.msg.match['in_port'], self.datapath.ofproto.OFPP_TABLE) res_arp_fn.assert_called_once_with( self.datapath, self.inport, self.arplib._arp_tbl[self.nets[0].net], self.proto_ethernet_mod, self.proto_vlan_mod, self.proto_arp_mod)