Fix line endings from CRLF to LF.

Fixes bug #1100234

Change-Id: Idb28558547b69ed4ad64334651405dc70fa9ddb3
This commit is contained in:
David Ripton 2013-01-16 09:14:19 -05:00
parent 20894c305c
commit d8b0f10ec7
5 changed files with 340 additions and 340 deletions

View File

@ -1,16 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,16 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,113 +1,113 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for Windows Hyper-V virtual switch quantum driver
"""
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for Windows Hyper-V virtual switch quantum driver
"""
import sys
import mock
import unittest2 as unittest
from quantum.openstack.common import cfg
from quantum.plugins.hyperv.agent import hyperv_quantum_agent
class TestHyperVQuantumAgent(unittest.TestCase):
def setUp(self):
self.addCleanup(cfg.CONF.reset)
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
self.agent = hyperv_quantum_agent.HyperVQuantumAgent()
self.agent.plugin_rpc = mock.Mock()
self.agent.context = mock.Mock()
self.agent.agent_id = mock.Mock()
self.agent._utils = mock.Mock()
def tearDown(self):
cfg.CONF.reset()
def test_port_bound(self):
port = mock.Mock()
net_uuid = 'my-net-uuid'
with mock.patch.object(
self.agent._utils, 'connect_vnic_to_vswitch'):
with mock.patch.object(
self.agent._utils, 'set_vswitch_port_vlan_id'):
self.agent._port_bound(port, net_uuid, 'vlan', None, None)
def test_port_unbound(self):
map = {
'network_type': 'vlan',
'vswitch_name': 'fake-vswitch',
'ports': [],
'vlan_id': 1}
net_uuid = 'my-net-uuid'
network_vswitch_map = (net_uuid, map)
with mock.patch.object(self.agent,
'_get_network_vswitch_map_by_port_id',
return_value=network_vswitch_map):
with mock.patch.object(
self.agent._utils,
'disconnect_switch_port'):
self.agent._port_unbound(net_uuid)
def test_treat_devices_added_returns_true_for_missing_device(self):
attrs = {'get_device_details.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_added([{}]))
def mock_treat_devices_added(self, details, func_name):
"""
:param details: the details to return for the device
:param func_name: the function that should be called
:returns: whether the named function was called
"""
attrs = {'get_device_details.return_value': details}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, func_name) as func:
self.assertFalse(self.agent._treat_devices_added([{}]))
return func.called
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self.mock_treat_devices_added(details,
'_treat_vif_port'))
def test_treat_devices_removed_returns_true_for_missing_device(self):
attrs = {'update_device_down.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_removed([{}]))
def mock_treat_devices_removed(self, port_exists):
details = dict(exists=port_exists)
attrs = {'update_device_down.return_value': details}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, '_port_unbound') as func:
self.assertFalse(self.agent._treat_devices_removed([{}]))
self.assertEqual(func.called, not port_exists)
def test_treat_devices_removed_unbinds_port(self):
self.mock_treat_devices_removed(False)
def test_treat_devices_removed_ignores_missing_port(self):
self.mock_treat_devices_removed(False)
class TestHyperVQuantumAgent(unittest.TestCase):
def setUp(self):
self.addCleanup(cfg.CONF.reset)
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
self.agent = hyperv_quantum_agent.HyperVQuantumAgent()
self.agent.plugin_rpc = mock.Mock()
self.agent.context = mock.Mock()
self.agent.agent_id = mock.Mock()
self.agent._utils = mock.Mock()
def tearDown(self):
cfg.CONF.reset()
def test_port_bound(self):
port = mock.Mock()
net_uuid = 'my-net-uuid'
with mock.patch.object(
self.agent._utils, 'connect_vnic_to_vswitch'):
with mock.patch.object(
self.agent._utils, 'set_vswitch_port_vlan_id'):
self.agent._port_bound(port, net_uuid, 'vlan', None, None)
def test_port_unbound(self):
map = {
'network_type': 'vlan',
'vswitch_name': 'fake-vswitch',
'ports': [],
'vlan_id': 1}
net_uuid = 'my-net-uuid'
network_vswitch_map = (net_uuid, map)
with mock.patch.object(self.agent,
'_get_network_vswitch_map_by_port_id',
return_value=network_vswitch_map):
with mock.patch.object(
self.agent._utils,
'disconnect_switch_port'):
self.agent._port_unbound(net_uuid)
def test_treat_devices_added_returns_true_for_missing_device(self):
attrs = {'get_device_details.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_added([{}]))
def mock_treat_devices_added(self, details, func_name):
"""
:param details: the details to return for the device
:param func_name: the function that should be called
:returns: whether the named function was called
"""
attrs = {'get_device_details.return_value': details}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, func_name) as func:
self.assertFalse(self.agent._treat_devices_added([{}]))
return func.called
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self.mock_treat_devices_added(details,
'_treat_vif_port'))
def test_treat_devices_removed_returns_true_for_missing_device(self):
attrs = {'update_device_down.side_effect': Exception()}
self.agent.plugin_rpc.configure_mock(**attrs)
self.assertTrue(self.agent._treat_devices_removed([{}]))
def mock_treat_devices_removed(self, port_exists):
details = dict(exists=port_exists)
attrs = {'update_device_down.return_value': details}
self.agent.plugin_rpc.configure_mock(**attrs)
with mock.patch.object(self.agent, '_port_unbound') as func:
self.assertFalse(self.agent._treat_devices_removed([{}]))
self.assertEqual(func.called, not port_exists)
def test_treat_devices_removed_unbinds_port(self):
self.mock_treat_devices_removed(False)
def test_treat_devices_removed_ignores_missing_port(self):
self.mock_treat_devices_removed(False)

View File

@ -1,88 +1,88 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from quantum import context
from quantum.extensions import portbindings
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.tests.unit import test_db_plugin as test_plugin
class HyperVQuantumPluginTestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = ('quantum.plugins.hyperv.'
'hyperv_quantum_plugin.HyperVQuantumPlugin')
def setUp(self):
super(HyperVQuantumPluginTestCase, self).setUp(self._plugin_name)
class TestHyperVVirtualSwitchBasicGet(
test_plugin.TestBasicGet, HyperVQuantumPluginTestCase):
pass
class TestHyperVVirtualSwitchV2HTTPResponse(
test_plugin.TestV2HTTPResponse, HyperVQuantumPluginTestCase):
pass
class TestHyperVVirtualSwitchPortsV2(
test_plugin.TestPortsV2, HyperVQuantumPluginTestCase):
def test_port_vif_details(self):
plugin = QuantumManager.get_plugin()
with self.port(name='name') as port:
port_id = port['port']['id']
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = plugin.get_port(ctx, port_id)
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
def test_ports_vif_details(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
plugin = QuantumManager.get_plugin()
with contextlib.nested(self.port(), self.port()) as (port1, port2):
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for port in ports:
self.assertEqual(port['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
class TestHyperVVirtualSwitchNetworksV2(
test_plugin.TestNetworksV2, HyperVQuantumPluginTestCase):
pass
class HyperVQuantumPluginTestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = ('quantum.plugins.hyperv.'
'hyperv_quantum_plugin.HyperVQuantumPlugin')
def setUp(self):
super(HyperVQuantumPluginTestCase, self).setUp(self._plugin_name)
class TestHyperVVirtualSwitchBasicGet(
test_plugin.TestBasicGet, HyperVQuantumPluginTestCase):
pass
class TestHyperVVirtualSwitchV2HTTPResponse(
test_plugin.TestV2HTTPResponse, HyperVQuantumPluginTestCase):
pass
class TestHyperVVirtualSwitchPortsV2(
test_plugin.TestPortsV2, HyperVQuantumPluginTestCase):
def test_port_vif_details(self):
plugin = QuantumManager.get_plugin()
with self.port(name='name') as port:
port_id = port['port']['id']
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = plugin.get_port(ctx, port_id)
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
def test_ports_vif_details(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
plugin = QuantumManager.get_plugin()
with contextlib.nested(self.port(), self.port()) as (port1, port2):
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for port in ports:
self.assertEqual(port['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
class TestHyperVVirtualSwitchNetworksV2(
test_plugin.TestNetworksV2, HyperVQuantumPluginTestCase):
pass

View File

@ -1,126 +1,126 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for hyperv quantum rpc
"""
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions SRL
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for hyperv quantum rpc
"""
import mock
import unittest2
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.plugins.hyperv import agent_notifier_api as ana
from quantum.plugins.hyperv.common import constants
class rpcHyperVApiTestCase(unittest2.TestCase):
def _test_hyperv_quantum_api(
self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
rpc_method_mock = mock.Mock()
rpc_method_mock.return_value = expected_retval
setattr(rpc, rpc_method, rpc_method_mock)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(rpc_method_mock.call_args[0],
expected_args):
self.assertEqual(arg, expected_arg)
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port',
network_type='fake_network_type',
segmentation_id='fake_segmentation_id',
physical_network='fake_physical_network')
def test_port_delete(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.DELETE),
'port_delete', rpc_method='fanout_cast',
port_id='port_id')
def test_tunnel_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
constants.TUNNEL,
topics.UPDATE),
'tunnel_update', rpc_method='fanout_cast',
tunnel_ip='fake_ip', tunnel_id='fake_id')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip')
class rpcHyperVApiTestCase(unittest2.TestCase):
def _test_hyperv_quantum_api(
self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
rpc_method_mock = mock.Mock()
rpc_method_mock.return_value = expected_retval
setattr(rpc, rpc_method, rpc_method_mock)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(rpc_method_mock.call_args[0],
expected_args):
self.assertEqual(arg, expected_arg)
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port',
network_type='fake_network_type',
segmentation_id='fake_segmentation_id',
physical_network='fake_physical_network')
def test_port_delete(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.DELETE),
'port_delete', rpc_method='fanout_cast',
port_id='port_id')
def test_tunnel_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_quantum_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
constants.TUNNEL,
topics.UPDATE),
'tunnel_update', rpc_method='fanout_cast',
tunnel_ip='fake_ip', tunnel_id='fake_id')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_quantum_api(
rpcapi, topics.PLUGIN,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip')