Merge "Improve test coverage in NEC plugin"

This commit is contained in:
Jenkins 2013-06-08 06:30:58 +00:00 committed by Gerrit Code Review
commit 7e0f9d2b1b
6 changed files with 810 additions and 55 deletions

View File

@ -117,8 +117,9 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base,
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_nec = NECPluginV2RPCCallbacks(self)
self.callback_sg = SecurityGroupServerRpcCallback()
callbacks = [NECPluginV2RPCCallbacks(self),
callbacks = [self.callback_nec,
DhcpRpcCallback(), L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]

View File

@ -0,0 +1,307 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import itertools
import time
import mock
from oslo.config import cfg
import testtools
from quantum.agent.linux import ovs_lib
from quantum.extensions import securitygroup as ext_sg
from quantum.plugins.nec.agent import nec_quantum_agent
from quantum.tests import base
DAEMON_LOOP_COUNT = 10
OVS_DPID = '00000629355b6943'
OVS_DPID_0X = '0x' + OVS_DPID
class TestNecAgentBase(base.BaseTestCase):
def setUp(self):
super(TestNecAgentBase, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.addCleanup(mock.patch.stopall)
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('host', 'dummy-host')
with contextlib.nested(
mock.patch.object(ovs_lib.OVSBridge, 'get_datapath_id',
return_value=OVS_DPID),
mock.patch('socket.gethostname', return_value='dummy-host'),
mock.patch('quantum.openstack.common.loopingcall.'
'FixedIntervalLoopingCall'),
mock.patch('quantum.agent.rpc.PluginReportStateAPI')
) as (get_datapath_id, gethostname,
loopingcall, state_rpc_api):
kwargs = {'integ_br': 'integ_br',
'root_helper': 'dummy_wrapper',
'polling_interval': 1}
self.agent = nec_quantum_agent.NECQuantumAgent(**kwargs)
self.loopingcall = loopingcall
self.state_rpc_api = state_rpc_api
class TestNecAgent(TestNecAgentBase):
def test_daemon_loop(self):
def state_check(index):
self.assertEqual(len(self.vif_ports_scenario[index]),
len(self.agent.cur_ports))
# Fake time.sleep to stop the infinite loop in daemon_loop()
self.sleep_count = 0
def sleep_mock(*args, **kwargs):
state_check(self.sleep_count)
self.sleep_count += 1
if self.sleep_count >= DAEMON_LOOP_COUNT:
raise RuntimeError()
vif_ports = [ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
self.agent.int_br),
ovs_lib.VifPort('port2', '2', 'id-2', 'mac-2',
self.agent.int_br)]
self.vif_ports_scenario = [[], [], vif_ports[0:1], vif_ports[0:2],
vif_ports[1:2], []]
# Ensure vif_ports_scenario is longer than DAEMON_LOOP_COUNT
if len(self.vif_ports_scenario) < DAEMON_LOOP_COUNT:
self.vif_ports_scenario.extend(
[] for _i in xrange(DAEMON_LOOP_COUNT -
len(self.vif_ports_scenario)))
with contextlib.nested(
mock.patch.object(time, 'sleep', side_effect=sleep_mock),
mock.patch.object(ovs_lib.OVSBridge, 'get_vif_ports'),
mock.patch.object(nec_quantum_agent.NECPluginApi, 'update_ports'),
mock.patch.object(self.agent.sg_agent, 'prepare_devices_filter'),
mock.patch.object(self.agent.sg_agent, 'remove_devices_filter')
) as (sleep, get_vif_potrs, update_ports,
prepare_devices_filter, remove_devices_filter):
get_vif_potrs.side_effect = self.vif_ports_scenario
with testtools.ExpectedException(RuntimeError):
self.agent.daemon_loop()
self.assertEqual(update_ports.call_count, 4)
self.assertEqual(sleep.call_count, DAEMON_LOOP_COUNT)
agent_id = 'nec-q-agent.dummy-host'
expected = [
mock.call(mock.ANY, agent_id, OVS_DPID_0X,
[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'}],
[]),
mock.call(mock.ANY, agent_id, OVS_DPID_0X,
[{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
[]),
mock.call(mock.ANY, agent_id, OVS_DPID_0X,
[], ['id-1']),
mock.call(mock.ANY, agent_id, OVS_DPID_0X,
[], ['id-2'])
]
update_ports.assert_has_calls(expected)
expected = [mock.call(['id-1']),
mock.call(['id-2'])]
self.assertEqual(prepare_devices_filter.call_count, 2)
prepare_devices_filter.assert_has_calls(expected)
self.assertEqual(remove_devices_filter.call_count, 2)
remove_devices_filter.assert_has_calls(expected)
sleep.assert_called_with(self.agent.polling_interval)
def test_report_state_installed(self):
self.loopingcall.assert_called_once_with(self.agent._report_state)
instance = self.loopingcall.return_value
instance.start.assert_called_once_with(interval=4)
def _check_report_state(self, cur_ports, num_ports, fail_mode,
first=False):
self.assertEqual(first or fail_mode,
'start_flag' in self.agent.agent_state)
self.agent.cur_ports = cur_ports
self.agent._report_state()
self.assertEqual(fail_mode,
'start_flag' in self.agent.agent_state)
self.assertEqual(self.agent.
agent_state['configurations']['devices'],
num_ports)
self.num_ports_hist.append(num_ports)
def _test_report_state(self, fail_mode):
log_mocked = mock.patch.object(nec_quantum_agent, 'LOG')
log_patched = log_mocked.start()
def record_state(*args, **kwargs):
self.record_calls.append(copy.deepcopy(args))
if fail_mode:
raise Exception()
self.record_calls = []
self.num_ports_hist = []
state_rpc = self.state_rpc_api.return_value
state_rpc.report_state.side_effect = record_state
dummy_vif = ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1', None)
self.state_rpc_api.assert_called_once_with('q-plugin')
self.assertTrue('start_flag' in self.agent.agent_state)
self._check_report_state([], 0, fail_mode, first=True)
self._check_report_state([dummy_vif] * 2, 2, fail_mode)
self._check_report_state([dummy_vif] * 5, 5, fail_mode)
self._check_report_state([], 0, fail_mode)
print 'record_state', self.record_calls
print 'num_ports_hist', self.num_ports_hist
# Since loopingcall start is mocked, call_count is same as
# the call count of check_report_state.
self.assertEqual(state_rpc.report_state.call_count, 4)
self.assertEqual(len(self.record_calls), 4)
for i, x in enumerate(itertools.izip(self.record_calls,
self.num_ports_hist)):
rec, num_ports = x
expected_state = {
'binary': 'quantum-nec-agent',
'host': 'dummy-host',
'topic': 'N/A',
'configurations': {'devices': 0},
'agent_type': 'NEC plugin agent'}
expected_state['configurations']['devices'] = num_ports
if i == 0 or fail_mode:
expected_state['start_flag'] = True
self.assertEqual(expected_state, rec[1])
self.assertEqual(fail_mode, log_patched.exception.called)
def test_report_state(self):
self._test_report_state(fail_mode=False)
def test_report_state_fail(self):
self._test_report_state(fail_mode=True)
class TestNecAgentCallback(TestNecAgentBase):
def test_port_update(self):
with contextlib.nested(
mock.patch.object(ovs_lib.OVSBridge, 'get_vif_port_by_id'),
mock.patch.object(self.agent.sg_agent, 'refresh_firewall')
) as (get_vif_port_by_id, refresh_firewall):
context = mock.Mock()
vifport = ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
self.agent.int_br)
# The OVS port does not exist.
get_vif_port_by_id.return_value = None
port = {'id': 'update-port-1'}
self.agent.callback_nec.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 1)
self.assertFalse(refresh_firewall.call_count)
# The OVS port exists but no security group is associated.
get_vif_port_by_id.return_value = vifport
port = {'id': 'update-port-1'}
self.agent.callback_nec.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 2)
self.assertFalse(refresh_firewall.call_count)
# The OVS port exists but a security group is associated.
get_vif_port_by_id.return_value = vifport
port = {'id': 'update-port-1',
ext_sg.SECURITYGROUPS: ['default']}
self.agent.callback_nec.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 3)
self.assertEqual(refresh_firewall.call_count, 1)
get_vif_port_by_id.return_value = None
port = {'id': 'update-port-1',
ext_sg.SECURITYGROUPS: ['default']}
self.agent.callback_nec.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 4)
self.assertEqual(refresh_firewall.call_count, 1)
class TestNecAgentPluginApi(TestNecAgentBase):
def _test_plugin_api(self, expected_failure=False):
with contextlib.nested(
mock.patch.object(nec_quantum_agent.NECPluginApi, 'make_msg'),
mock.patch.object(nec_quantum_agent.NECPluginApi, 'call'),
mock.patch.object(nec_quantum_agent, 'LOG')
) as (make_msg, apicall, log):
agent_id = 'nec-q-agent.dummy-host'
if expected_failure:
apicall.side_effect = Exception()
self.agent.plugin_rpc.update_ports(
mock.sentinel.ctx, agent_id, OVS_DPID_0X,
# port_added
[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
# port_removed
['id-3', 'id-4', 'id-5'])
make_msg.assert_called_once_with(
'update_ports', topic='q-agent-notifier',
agent_id=agent_id, datapath_id=OVS_DPID_0X,
port_added=[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
port_removed=['id-3', 'id-4', 'id-5'])
apicall.assert_called_once_with(mock.sentinel.ctx,
make_msg.return_value)
self.assertTrue(log.info.called)
if expected_failure:
self.assertTrue(log.warn.called)
def test_plugin_api(self):
self._test_plugin_api()
def test_plugin_api_fail(self):
self._test_plugin_api(expected_failure=True)
class TestNecAgentMain(base.BaseTestCase):
def test_main(self):
with contextlib.nested(
mock.patch.object(nec_quantum_agent, 'NECQuantumAgent'),
mock.patch('eventlet.monkey_patch'),
mock.patch('quantum.common.config'),
mock.patch.object(nec_quantum_agent, 'config')
) as (agent, eventlet, logging_config, cfg):
cfg.CONF.ovs.integration_bridge = 'br-int-x'
cfg.CONF.AGENT.root_helper = 'dummy-helper'
cfg.CONF.AGENT.polling_interval = 10
nec_quantum_agent.main()
self.assertTrue(eventlet.called)
self.assertTrue(logging_config.setup_logging.called)
agent.assert_has_calls([
mock.call('br-int-x', 'dummy-helper', 10),
mock.call().daemon_loop()
])

View File

@ -19,6 +19,7 @@ from quantum.common import topics
from quantum import context as q_context
from quantum.extensions import portbindings
from quantum import manager
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec import nec_plugin
from quantum.tests.unit import _test_extension_portbindings as test_bindings
@ -26,16 +27,24 @@ from quantum.tests.unit import test_db_plugin as test_plugin
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
OFC_MANAGER = 'quantum.plugins.nec.nec_plugin.ofc_manager.OFCManager'
PLUGIN_NAME = 'quantum.plugins.nec.nec_plugin.NECPluginV2'
OFC_MANAGER = 'quantum.plugins.nec.nec_plugin.ofc_manager.OFCManager'
OFC_DRIVER = 'quantum.tests.unit.nec.stub_ofc_driver.StubOFCDriver'
class NecPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = PLUGIN_NAME
PACKET_FILTER_ENABLE = False
def setUp(self):
self.addCleanup(mock.patch.stopall)
ofc_manager_cls = mock.patch(OFC_MANAGER).start()
ofc_driver = ofc_manager_cls.return_value.driver
ofc_driver.filter_supported.return_value = self.PACKET_FILTER_ENABLE
super(NecPluginV2TestCase, self).setUp(self._plugin_name)
self.context = q_context.get_admin_context()
self.plugin = manager.QuantumManager.get_plugin()
class TestNecBasicGet(test_plugin.TestBasicGet, NecPluginV2TestCase):
@ -76,19 +85,13 @@ class TestNecPortBindingNoSG(TestNecPortBinding):
class TestNecPortsV2Callback(NecPluginV2TestCase):
def setUp(self):
self.addCleanup(mock.patch.stopall)
ofc_manager_p = mock.patch(OFC_MANAGER)
ofc_manager_cls = ofc_manager_p.start()
self.ofc = mock.Mock()
ofc_manager_cls.return_value = self.ofc
super(TestNecPortsV2Callback, self).setUp()
self.callbacks = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
self.ofc = self.plugin.ofc
self.ofc_port_exists = False
self._setup_side_effects()
super(TestNecPortsV2Callback, self).setUp()
self.context = q_context.get_admin_context()
self.plugin = manager.QuantumManager.get_plugin()
self.callbacks = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
def _setup_side_effects(self):
def _create_ofc_port_called(*args, **kwargs):
self.ofc_port_exists = True
@ -192,3 +195,297 @@ class TestNecPortsV2Callback(NecPluginV2TestCase):
self.ofc.assert_has_calls(expected)
self.assertEqual(2, self.ofc.create_ofc_port.call_count)
self.assertEqual(1, self.ofc.delete_ofc_port.call_count)
class TestNecPluginDbTest(NecPluginV2TestCase):
def test_update_resource(self):
with self.network() as network:
self.assertEqual("ACTIVE", network['network']['status'])
net_id = network['network']['id']
for status in ["DOWN", "BUILD", "ERROR", "ACTIVE"]:
self.plugin._update_resource_status(
self.context, 'network', net_id,
getattr(nec_plugin.OperationalStatus, status))
n = self.plugin._get_network(self.context, net_id)
self.assertEqual(status, n.status)
class TestNecPluginOfcManager(NecPluginV2TestCase):
def setUp(self):
super(TestNecPluginOfcManager, self).setUp()
self.ofc = self.plugin.ofc
def _update_resource(self, resource, id, data):
collection = resource + 's'
data = {resource: data}
req = self.new_update_request(collection, data, id)
res = self.deserialize(self.fmt, req.get_response(self.api))
return res[resource]
def _show_resource(self, resource, id):
collection = resource + 's'
req = self.new_show_request(collection, id)
res = self.deserialize(self.fmt, req.get_response(self.api))
return res[resource]
def _list_resource(self, resource):
collection = resource + 's'
req = self.new_list_request(collection)
res = req.get_response(self.api)
return res[collection]
def _delete_resource(self, resource, id):
collection = resource + 's'
req = self.new_delete_request(collection, id)
res = req.get_response(self.api)
return res.status_int
def test_create_network(self):
self.ofc.exists_ofc_tenant.return_value = False
net = None
ctx = mock.ANY
with self.network() as network:
net = network['network']
self.assertEqual(network['network']['status'], 'ACTIVE')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_create_network_with_admin_state_down(self):
self.ofc.exists_ofc_tenant.return_value = False
net = None
ctx = mock.ANY
with self.network(admin_state_up=False) as network:
net = network['network']
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_create_two_network(self):
self.ofc.exists_ofc_tenant.side_effect = [False, True]
nets = []
ctx = mock.ANY
with self.network() as net1:
nets.append(net1['network'])
self.assertEqual(net1['network']['status'], 'ACTIVE')
with self.network() as net2:
nets.append(net2['network'])
self.assertEqual(net2['network']['status'], 'ACTIVE')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, nets[0]['id'],
nets[0]['name']),
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, nets[1]['id'],
nets[1]['name']),
mock.call.delete_ofc_network(ctx, nets[1]['id'], mock.ANY),
mock.call.delete_ofc_network(ctx, nets[0]['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_create_network_fail(self):
self.ofc.exists_ofc_tenant.return_value = False
self.ofc.create_ofc_network.side_effect = nexc.OFCException(
reason='hoge')
net = None
ctx = mock.ANY
with self.network() as network:
net = network['network']
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_update_network(self):
self.ofc.exists_ofc_tenant.return_value = False
net = None
ctx = mock.ANY
with self.network() as network:
net = network['network']
self.assertEqual(network['network']['status'], 'ACTIVE')
# Set admin_state_up to False
res = self._update_resource('network', net['id'],
{'admin_state_up': False})
self.assertFalse(res['admin_state_up'])
# Set admin_state_up to True
res = self._update_resource('network', net['id'],
{'admin_state_up': True})
self.assertTrue(res['admin_state_up'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def _rpcapi_update_ports(self, agent_id='nec-q-agent.fake',
datapath_id="0xabc", added=[], removed=[]):
kwargs = {'topic': topics.AGENT,
'agent_id': agent_id,
'datapath_id': datapath_id,
'port_added': added, 'port_removed': removed}
self.plugin.callback_nec.update_ports(self.context, **kwargs)
def test_create_port_no_ofc_creation(self):
self.ofc.exists_ofc_tenant.return_value = False
self.ofc.exists_ofc_port.return_value = False
net = None
p1 = None
ctx = mock.ANY
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
p1 = port['port']
net_id = port['port']['network_id']
net = self._show_resource('network', net_id)
self.assertEqual(net['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'ACTIVE')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_create_port_with_ofc_creation(self):
self.ofc.exists_ofc_tenant.return_value = False
self.ofc.exists_ofc_port.side_effect = [False, True]
net = None
p1 = None
ctx = mock.ANY
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
p1 = port['port']
net_id = port['port']['network_id']
net = self._show_resource('network', net_id)
self.assertEqual(net['status'], 'ACTIVE')
self.assertEqual(p1['status'], 'ACTIVE')
# Check the port is not created on OFC
self.assertFalse(self.ofc.create_ofc_port.call_count)
# Register portinfo, then the port is created on OFC
portinfo = {'id': p1['id'], 'port_no': 123}
self._rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.create_ofc_port(ctx, p1['id'], mock.ANY),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.delete_ofc_port(ctx, p1['id'], mock.ANY),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)
def test_update_port(self):
self._test_update_port_with_admin_state(resource='port')
def test_update_network_with_ofc_port(self):
self._test_update_port_with_admin_state(resource='network')
def _test_update_port_with_admin_state(self, resource='port'):
self.ofc.exists_ofc_tenant.return_value = False
self.ofc.exists_ofc_port.side_effect = [False, True, False]
net = None
p1 = None
ctx = mock.ANY
if resource == 'network':
net_ini_admin_state = False
port_ini_admin_state = True
else:
net_ini_admin_state = True
port_ini_admin_state = False
with self.network(admin_state_up=net_ini_admin_state) as network:
with self.subnet(network=network) as subnet:
with self.port(subnet=subnet,
admin_state_up=port_ini_admin_state) as port:
p1 = port['port']
net_id = port['port']['network_id']
res_id = net_id if resource == 'network' else p1['id']
net = self._show_resource('network', net_id)
# Check the port is not created on OFC
self.assertFalse(self.ofc.create_ofc_port.call_count)
# Register portinfo, then the port is created on OFC
portinfo = {'id': p1['id'], 'port_no': 123}
self._rpcapi_update_ports(added=[portinfo])
self.assertFalse(self.ofc.create_ofc_port.call_count)
self._update_resource(resource, res_id,
{'admin_state_up': True})
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertFalse(self.ofc.delete_ofc_port.call_count)
self._update_resource(resource, res_id,
{'admin_state_up': False})
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
net['name']),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.create_ofc_port(ctx, p1['id'], mock.ANY),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.delete_ofc_port(ctx, p1['id'], mock.ANY),
mock.call.exists_ofc_port(ctx, p1['id']),
mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
mock.call.delete_ofc_tenant(ctx, self._tenant_id)
]
self.ofc.assert_has_calls(expected)

View File

@ -166,6 +166,9 @@ class PFCDriverTestBase(base.BaseTestCase):
self.driver.delete_port(port_path)
self.mox.VerifyAll()
def test_filter_supported(self):
self.assertFalse(self.driver.filter_supported())
class PFCDriverBaseTest(PFCDriverTestBase):
pass
@ -185,7 +188,14 @@ class PFCV3DriverTest(PFCDriverTestBase):
self.assertEqual(ofc_t_path, ret)
def testc_delete_tenant(self):
pass
t, n, p = self.get_ofc_item_random_params()
path = "/tenants/%s" % _ofc(t)
# There is no API call.
self.mox.ReplayAll()
self.driver.delete_tenant(path)
self.mox.VerifyAll()
class PFCV4DriverTest(PFCDriverTestBase):

View File

@ -23,30 +23,30 @@ from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.nec.db import api as ndb # noqa
from quantum.tests.unit.nec import test_nec_plugin
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
PLUGIN_NAME = test_nec_plugin.PLUGIN_NAME
OFC_MANAGER = test_nec_plugin.OFC_MANAGER
AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
mock.patch(NOTIFIER).start()
mock.patch(OFC_MANAGER).start()
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
self.notifier = manager.QuantumManager.get_plugin().notifier
def tearDown(self):
super(NecSecurityGroupsTestCase, self).tearDown()

View File

@ -15,6 +15,8 @@
# under the License.
# @author: Ryota MIBU
import random
import mox
from quantum import context
@ -48,14 +50,40 @@ class TremaDriverTestBase(base.BaseTestCase):
tenant_id = uuidutils.generate_uuid()
network_id = uuidutils.generate_uuid()
port_id = uuidutils.generate_uuid()
mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
portinfo = nmodels.PortInfo(id=port_id, datapath_id="0x123456789",
port_no=1234, vlan_id=321,
mac="11:22:33:44:55:66")
mac=mac)
return tenant_id, network_id, portinfo
class TremaDriverNetworkTestBase(TremaDriverTestBase):
def test_create_tenant(self):
t, n, p = self.get_ofc_item_random_params()
# There is no API call.
self.mox.ReplayAll()
ret = self.driver.create_tenant('dummy_desc', t)
self.mox.VerifyAll()
ofc_t_path = "/tenants/%s" % t
self.assertEqual(ofc_t_path, ret)
def test_update_tenant(self):
t, n, p = self.get_ofc_item_random_params()
path = "/tenants/%s" % t
# There is no API call.
self.mox.ReplayAll()
self.driver.update_tenant(path, 'dummy_desc')
self.mox.VerifyAll()
def testc_delete_tenant(self):
t, n, p = self.get_ofc_item_random_params()
path = "/tenants/%s" % t
# There is no API call.
self.mox.ReplayAll()
self.driver.delete_tenant(path)
self.mox.VerifyAll()
def testa_create_network(self):
t, n, p = self.get_ofc_item_random_params()
description = "desc of %s" % n
@ -83,6 +111,9 @@ class TremaPortBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_port"
def test_filter_supported(self):
self.assertTrue(self.driver.filter_supported())
def testd_create_port(self):
_t, n, p = self.get_ofc_item_random_params()
@ -114,6 +145,9 @@ class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_portmac"
def test_filter_supported(self):
self.assertTrue(self.driver.filter_supported())
def testd_create_port(self):
t, n, p = self.get_ofc_item_random_params()
dummy_port = "dummy-%s" % p.id
@ -154,6 +188,9 @@ class TremaMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_mac"
def test_filter_supported(self):
self.assertFalse(self.driver.filter_supported())
def testd_create_port(self):
t, n, p = self.get_ofc_item_random_params()
@ -179,38 +216,43 @@ class TremaMACBaseDriverTest(TremaDriverNetworkTestBase):
class TremaFilterDriverTest(TremaDriverTestBase):
def _test_create_filter(self, filter_dict=None, filter_post=None,
filter_wildcards=None, no_portinfo=False):
t, n, p = self.get_ofc_item_random_params()
src_mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
if filter_wildcards is None:
filter_wildcards = []
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test."""
t, n, p = (super(TremaFilterDriverTest, self).
get_ofc_item_random_params())
filter_id = uuidutils.generate_uuid()
filter_dict = {'tenant_id': t,
'id': filter_id,
'network_id': n,
'priority': 123,
'action': "ACCEPT",
'in_port': p.id,
'src_mac': p.mac,
'dst_mac': "",
'eth_type': 0,
'src_cidr': "",
'dst_cidr': "",
'src_port': 0,
'dst_port': 0,
'protocol': "TCP",
'admin_state_up': True,
'status': "ACTIVE"}
filter_item = nmodels.PacketFilter(**filter_dict)
return t, n, p, filter_item
def testa_create_filter(self):
t, n, p, f = self.get_ofc_item_random_params()
f = {'tenant_id': t,
'id': uuidutils.generate_uuid(),
'network_id': n,
'priority': 123,
'action': "ACCEPT",
'in_port': p.id,
'src_mac': src_mac,
'dst_mac': "",
'eth_type': 0,
'src_cidr': "",
'dst_cidr': "",
'src_port': 0,
'dst_port': 0,
'protocol': "TCP",
'admin_state_up': True,
'status': "ACTIVE"}
if filter_dict:
f.update(filter_dict)
print 'filter=%s' % f
net_path = "/networks/%s" % n
ofp_wildcards = 'dl_vlan,dl_vlan_pcp,nw_tos,dl_dst,' + \
'nw_src:32,nw_dst:32,tp_src,tp_dst'
body = {'id': f.id,
all_wildcards_ofp = ['dl_vlan', 'dl_vlan_pcp', 'nw_tos',
'in_port', 'dl_src', 'dl_dst',
'nw_src', 'nw_dst',
'dl_type', 'nw_proto',
'tp_src', 'tp_dst']
all_wildcards_non_ofp = ['in_datapath_id', 'slice']
body = {'id': f['id'],
'action': 'ALLOW',
'priority': 123,
'slice': n,
@ -218,19 +260,117 @@ class TremaFilterDriverTest(TremaDriverTestBase):
'in_port': 1234,
'nw_proto': '0x6',
'dl_type': '0x800',
'dl_src': p.mac,
'ofp_wildcards': ofp_wildcards}
'dl_src': src_mac}
if filter_post:
body.update(filter_post)
if no_portinfo:
filter_wildcards += ['in_datapath_id', 'in_port']
p = None
for field in filter_wildcards:
if field in body:
del body[field]
ofp_wildcards = ["%s:32" % _f if _f in ['nw_src', 'nw_dst'] else _f
for _f in all_wildcards_ofp if _f not in body]
body['ofp_wildcards'] = ','.join(ofp_wildcards)
non_ofp_wildcards = [_f for _f in all_wildcards_non_ofp
if _f not in body]
if non_ofp_wildcards:
body['wildcards'] = ','.join(non_ofp_wildcards)
ofc_client.OFCClient.do_request("POST", "/filters", body=body)
self.mox.ReplayAll()
ret = self.driver.create_filter(net_path, f, p, f.id)
ret = self.driver.create_filter(net_path, f, p, f['id'])
self.mox.VerifyAll()
self.assertEqual(ret, '/filters/%s' % f.id)
self.assertEqual(ret, '/filters/%s' % f['id'])
def test_create_filter_accept(self):
self._test_create_filter(filter_dict={'action': 'ACCEPT'})
def test_create_filter_allow(self):
self._test_create_filter(filter_dict={'action': 'ALLOW'})
def test_create_filter_deny(self):
self._test_create_filter(filter_dict={'action': 'DENY'},
filter_post={'action': 'DENY'})
def test_create_filter_drop(self):
self._test_create_filter(filter_dict={'action': 'DROP'},
filter_post={'action': 'DENY'})
def test_create_filter_no_port(self):
self._test_create_filter(no_portinfo=True)
def test_create_filter_src_mac_wildcard(self):
self._test_create_filter(filter_dict={'src_mac': ''},
filter_wildcards=['dl_src'])
def test_create_filter_dst_mac(self):
dst_mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
self._test_create_filter(filter_dict={'dst_mac': dst_mac},
filter_post={'dl_dst': dst_mac})
def test_create_filter_src_cidr(self):
src_cidr = '10.2.0.0/24'
self._test_create_filter(filter_dict={'src_cidr': src_cidr},
filter_post={'nw_src': src_cidr})
def test_create_filter_dst_cidr(self):
dst_cidr = '192.168.10.0/24'
self._test_create_filter(filter_dict={'dst_cidr': dst_cidr},
filter_post={'nw_dst': dst_cidr})
def test_create_filter_proto_icmp(self):
self._test_create_filter(
filter_dict={'protocol': 'icmp'},
filter_post={'dl_type': '0x800', 'nw_proto': '0x1'})
def test_create_filter_proto_tcp(self):
self._test_create_filter(
filter_dict={'protocol': 'tcp'},
filter_post={'dl_type': '0x800', 'nw_proto': '0x6'})
def test_create_filter_proto_udp(self):
self._test_create_filter(
filter_dict={'protocol': 'udp'},
filter_post={'dl_type': '0x800', 'nw_proto': '0x11'})
def test_create_filter_proto_arp(self):
self._test_create_filter(
filter_dict={'protocol': 'arp'},
filter_post={'dl_type': '0x806'},
filter_wildcards=['nw_proto'])
def test_create_filter_proto_misc(self):
self._test_create_filter(
filter_dict={'protocol': '0x33', 'eth_type': '0x900'},
filter_post={'dl_type': '0x900', 'nw_proto': '0x33'})
def test_create_filter_proto_misc_dl_type_wildcard(self):
self._test_create_filter(
filter_dict={'protocol': '0x33', 'ether_type': ''},
filter_post={'nw_proto': '0x33'},
filter_wildcards=['dl_type'])
def test_create_filter_proto_wildcard(self):
self._test_create_filter(
filter_dict={'protocol': ''},
filter_wildcards=['dl_type', 'nw_proto'])
def test_create_filter_src_dst_port(self):
self._test_create_filter(filter_dict={'src_port': 8192,
'dst_port': 4096},
filter_post={'tp_src': '0x2000',
'tp_dst': '0x1000'})
def testb_delete_filter(self):
t, n, p, f = self.get_ofc_item_random_params()
t, n, p = self.get_ofc_item_random_params()
f_path = "/filters/%s" % f.id
f_path = "/filters/%s" % uuidutils.generate_uuid()
ofc_client.OFCClient.do_request("DELETE", f_path)
self.mox.ReplayAll()