Merge "Add the option to minimize ovs l2 polling"
This commit is contained in:
commit
ddb5769371
@ -87,6 +87,9 @@
|
||||
# Agent's polling interval in seconds
|
||||
# polling_interval = 2
|
||||
|
||||
# Minimize polling by monitoring ovsdb for interface changes
|
||||
# minimize_polling = False
|
||||
|
||||
# (ListOpt) The types of tenant network tunnels supported by the agent.
|
||||
# Setting this will enable tunneling support in the agent. This can be set to
|
||||
# either 'gre' or 'vxlan'. If this is unset, it will default to [] and
|
||||
|
@ -13,6 +13,8 @@
|
||||
# from the old mechanism
|
||||
ovs-vsctl: CommandFilter, ovs-vsctl, root
|
||||
ovs-ofctl: CommandFilter, ovs-ofctl, root
|
||||
kill_ovsdb_client: KillFilter, root, /usr/bin/ovsdb-client, -9
|
||||
ovsdb-client: CommandFilter, ovsdb-client, root
|
||||
xe: CommandFilter, xe, root
|
||||
|
||||
# ip_lib
|
||||
|
@ -70,9 +70,12 @@ class AsyncProcess(object):
|
||||
self.respawn_interval = respawn_interval
|
||||
self._process = None
|
||||
self._kill_event = None
|
||||
self._reset_queues()
|
||||
self._watchers = []
|
||||
|
||||
def _reset_queues(self):
|
||||
self._stdout_lines = eventlet.queue.LightQueue()
|
||||
self._stderr_lines = eventlet.queue.LightQueue()
|
||||
self._watchers = []
|
||||
|
||||
def start(self):
|
||||
"""Launch a process and monitor it asynchronously."""
|
||||
|
114
neutron/agent/linux/ovsdb_monitor.py
Normal file
114
neutron/agent/linux/ovsdb_monitor.py
Normal file
@ -0,0 +1,114 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OvsdbMonitor(async_process.AsyncProcess):
|
||||
"""Manages an invocation of 'ovsdb-client monitor'."""
|
||||
|
||||
def __init__(self, table_name, columns=None, format=None,
|
||||
root_helper=None, respawn_interval=None):
|
||||
|
||||
cmd = ['ovsdb-client', 'monitor', table_name]
|
||||
if columns:
|
||||
cmd.append(','.join(columns))
|
||||
if format:
|
||||
cmd.append('--format=%s' % format)
|
||||
super(OvsdbMonitor, self).__init__(cmd,
|
||||
root_helper=root_helper,
|
||||
respawn_interval=respawn_interval)
|
||||
|
||||
def _read_stdout(self):
|
||||
data = self._process.stdout.readline()
|
||||
if not data:
|
||||
return
|
||||
#TODO(marun) The default root helper outputs exit errors to
|
||||
# stdout due to bug #1219530. This check can be moved to
|
||||
# _read_stderr once the error is correctly output to stderr.
|
||||
if self.root_helper and self.root_helper in data:
|
||||
self._stderr_lines.put(data)
|
||||
LOG.error(_('Error received from ovsdb monitor: %s') % data)
|
||||
else:
|
||||
self._stdout_lines.put(data)
|
||||
LOG.debug(_('Output received from ovsdb monitor: %s') % data)
|
||||
return data
|
||||
|
||||
def _read_stderr(self):
|
||||
data = super(OvsdbMonitor, self)._read_stderr()
|
||||
if data:
|
||||
LOG.error(_('Error received from ovsdb monitor: %s') % data)
|
||||
# Do not return value to ensure that stderr output will
|
||||
# stop the monitor.
|
||||
|
||||
|
||||
class SimpleInterfaceMonitor(OvsdbMonitor):
|
||||
"""Monitors the Interface table of the local host's ovsdb for changes.
|
||||
|
||||
The has_updates() method indicates whether changes to the ovsdb
|
||||
Interface table have been detected since the monitor started or
|
||||
since the previous access.
|
||||
"""
|
||||
|
||||
def __init__(self, root_helper=None, respawn_interval=None):
|
||||
super(SimpleInterfaceMonitor, self).__init__(
|
||||
'Interface',
|
||||
columns=['name'],
|
||||
format='json',
|
||||
root_helper=root_helper,
|
||||
respawn_interval=respawn_interval,
|
||||
)
|
||||
self.data_received = False
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
return (self.data_received and
|
||||
self._kill_event and
|
||||
not self._kill_event.ready())
|
||||
|
||||
@property
|
||||
def has_updates(self):
|
||||
"""Indicate whether the ovsdb Interface table has been updated.
|
||||
|
||||
True will be returned if the monitor process is not active.
|
||||
This 'failing open' minimizes the risk of falsely indicating
|
||||
the absense of updates at the expense of potential false
|
||||
positives.
|
||||
"""
|
||||
return bool(list(self.iter_stdout())) or not self.is_active
|
||||
|
||||
def start(self, block=False, timeout=5):
|
||||
super(SimpleInterfaceMonitor, self).start()
|
||||
if block:
|
||||
eventlet.timeout.Timeout(timeout)
|
||||
while not self.is_active:
|
||||
eventlet.sleep()
|
||||
|
||||
def _kill(self, *args, **kwargs):
|
||||
self.data_received = False
|
||||
super(SimpleInterfaceMonitor, self)._kill(*args, **kwargs)
|
||||
|
||||
def _read_stdout(self):
|
||||
data = super(SimpleInterfaceMonitor, self)._read_stdout()
|
||||
if data and not self.data_received:
|
||||
self.data_received = True
|
||||
return data
|
104
neutron/agent/linux/polling.py
Normal file
104
neutron/agent/linux/polling.py
Normal file
@ -0,0 +1,104 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import eventlet
|
||||
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def get_polling_manager(minimize_polling=False, root_helper=None):
|
||||
if minimize_polling:
|
||||
pm = InterfacePollingMinimizer(root_helper=root_helper)
|
||||
pm.start()
|
||||
else:
|
||||
pm = AlwaysPoll()
|
||||
try:
|
||||
yield pm
|
||||
finally:
|
||||
if minimize_polling:
|
||||
pm.stop()
|
||||
|
||||
|
||||
class BasePollingManager(object):
|
||||
|
||||
def __init__(self):
|
||||
self._force_polling = False
|
||||
self._polling_completed = True
|
||||
|
||||
def force_polling(self):
|
||||
self._force_polling = True
|
||||
|
||||
def polling_completed(self):
|
||||
self._polling_completed = True
|
||||
|
||||
def _is_polling_required(self):
|
||||
raise NotImplemented
|
||||
|
||||
@property
|
||||
def is_polling_required(self):
|
||||
# Always consume the updates to minimize polling.
|
||||
polling_required = self._is_polling_required()
|
||||
|
||||
# Polling is required regardless of whether updates have been
|
||||
# detected.
|
||||
if self._force_polling:
|
||||
self._force_polling = False
|
||||
polling_required = True
|
||||
|
||||
# Polling is required if not yet done for previously detected
|
||||
# updates.
|
||||
if not self._polling_completed:
|
||||
polling_required = True
|
||||
|
||||
if polling_required:
|
||||
# Track whether polling has been completed to ensure that
|
||||
# polling can be required until the caller indicates via a
|
||||
# call to polling_completed() that polling has been
|
||||
# successfully performed.
|
||||
self._polling_completed = False
|
||||
|
||||
return polling_required
|
||||
|
||||
|
||||
class AlwaysPoll(BasePollingManager):
|
||||
|
||||
@property
|
||||
def is_polling_required(self):
|
||||
return True
|
||||
|
||||
|
||||
class InterfacePollingMinimizer(BasePollingManager):
|
||||
"""Monitors ovsdb to determine when polling is required."""
|
||||
|
||||
def __init__(self, root_helper=None):
|
||||
super(InterfacePollingMinimizer, self).__init__()
|
||||
self._monitor = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
root_helper=root_helper)
|
||||
|
||||
def start(self):
|
||||
self._monitor.start()
|
||||
|
||||
def stop(self):
|
||||
self._monitor.stop()
|
||||
|
||||
def _is_polling_required(self):
|
||||
# Maximize the chances of update detection having a chance to
|
||||
# collect output.
|
||||
eventlet.sleep()
|
||||
return self._monitor.has_updates
|
@ -32,6 +32,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import l2population_rpc
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ovs_lib
|
||||
from neutron.agent.linux import polling
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
@ -156,7 +157,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
def __init__(self, integ_br, tun_br, local_ip,
|
||||
bridge_mappings, root_helper,
|
||||
polling_interval, tunnel_types=None,
|
||||
veth_mtu=None, l2_population=False):
|
||||
veth_mtu=None, l2_population=False,
|
||||
minimize_polling=False):
|
||||
'''Constructor.
|
||||
|
||||
:param integ_br: name of the integration bridge.
|
||||
@ -169,6 +171,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
the agent. If set, will automatically set enable_tunneling to
|
||||
True.
|
||||
:param veth_mtu: MTU size for veth interfaces.
|
||||
:param minimize_polling: Optional, whether to minimize polling by
|
||||
monitoring ovsdb for interface changes.
|
||||
'''
|
||||
self.veth_mtu = veth_mtu
|
||||
self.root_helper = root_helper
|
||||
@ -199,6 +203,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
constants.TYPE_VXLAN: {}}
|
||||
|
||||
self.polling_interval = polling_interval
|
||||
self.minimize_polling = minimize_polling
|
||||
|
||||
if tunnel_types:
|
||||
self.enable_tunneling = True
|
||||
@ -1042,7 +1047,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
resync = True
|
||||
return resync
|
||||
|
||||
def rpc_loop(self):
|
||||
def rpc_loop(self, polling_manager=None):
|
||||
if not polling_manager:
|
||||
polling_manager = polling.AlwaysPoll()
|
||||
|
||||
sync = True
|
||||
ports = set()
|
||||
ancillary_ports = set()
|
||||
@ -1056,28 +1064,34 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
ports.clear()
|
||||
ancillary_ports.clear()
|
||||
sync = False
|
||||
polling_manager.force_polling()
|
||||
|
||||
# Notify the plugin of tunnel IP
|
||||
if self.enable_tunneling and tunnel_sync:
|
||||
LOG.info(_("Agent tunnel out of sync with plugin!"))
|
||||
tunnel_sync = self.tunnel_sync()
|
||||
|
||||
port_info = self.update_ports(ports)
|
||||
if polling_manager.is_polling_required:
|
||||
port_info = self.update_ports(ports)
|
||||
|
||||
# notify plugin about port deltas
|
||||
if port_info:
|
||||
LOG.debug(_("Agent loop has new devices!"))
|
||||
# If treat devices fails - must resync with plugin
|
||||
sync = self.process_network_ports(port_info)
|
||||
ports = port_info['current']
|
||||
|
||||
# Treat ancillary devices if they exist
|
||||
if self.ancillary_brs:
|
||||
port_info = self.update_ancillary_ports(ancillary_ports)
|
||||
# notify plugin about port deltas
|
||||
if port_info:
|
||||
rc = self.process_ancillary_network_ports(port_info)
|
||||
ancillary_ports = port_info['current']
|
||||
sync = sync | rc
|
||||
LOG.debug(_("Agent loop has new devices!"))
|
||||
# If treat devices fails - must resync with plugin
|
||||
sync = self.process_network_ports(port_info)
|
||||
ports = port_info['current']
|
||||
|
||||
# Treat ancillary devices if they exist
|
||||
if self.ancillary_brs:
|
||||
port_info = self.update_ancillary_ports(
|
||||
ancillary_ports)
|
||||
if port_info:
|
||||
rc = self.process_ancillary_network_ports(
|
||||
port_info)
|
||||
ancillary_ports = port_info['current']
|
||||
sync = sync | rc
|
||||
|
||||
polling_manager.polling_completed()
|
||||
|
||||
except Exception:
|
||||
LOG.exception(_("Error in agent event loop"))
|
||||
@ -1095,7 +1109,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
'elapsed': elapsed})
|
||||
|
||||
def daemon_loop(self):
|
||||
self.rpc_loop()
|
||||
with polling.get_polling_manager(self.minimize_polling,
|
||||
self.root_helper) as pm:
|
||||
self.rpc_loop(polling_manager=pm)
|
||||
|
||||
|
||||
def check_ovs_version(min_required_version, root_helper):
|
||||
@ -1154,6 +1170,7 @@ def create_agent_config_map(config):
|
||||
bridge_mappings=bridge_mappings,
|
||||
root_helper=config.AGENT.root_helper,
|
||||
polling_interval=config.AGENT.polling_interval,
|
||||
minimize_polling=config.AGENT.minimize_polling,
|
||||
tunnel_types=config.AGENT.tunnel_types,
|
||||
veth_mtu=config.AGENT.veth_mtu,
|
||||
l2_population=config.AGENT.l2_population,
|
||||
|
@ -62,6 +62,10 @@ agent_opts = [
|
||||
cfg.IntOpt('polling_interval', default=2,
|
||||
help=_("The number of seconds the agent will wait between "
|
||||
"polling for local device changes.")),
|
||||
cfg.BoolOpt('minimize_polling',
|
||||
default=False,
|
||||
help=_("Minimize polling by monitoring ovsdb for interface "
|
||||
"changes.")),
|
||||
cfg.ListOpt('tunnel_types', default=DEFAULT_TUNNEL_TYPES,
|
||||
help=_("Network types supported by the agent "
|
||||
"(gre and/or vxlan)")),
|
||||
|
@ -17,9 +17,11 @@
|
||||
|
||||
"""Base Test Case for all Unit Tests"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
import eventlet.timeout
|
||||
import fixtures
|
||||
from oslo.config import cfg
|
||||
import stubout
|
||||
@ -88,3 +90,10 @@ class BaseTestCase(testtools.TestCase):
|
||||
group = kw.pop('group', None)
|
||||
for k, v in kw.iteritems():
|
||||
CONF.set_override(k, v, group)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assert_max_execution_time(self, max_execution_time=5):
|
||||
with eventlet.timeout.Timeout(max_execution_time, False):
|
||||
yield
|
||||
return
|
||||
self.fail('Execution of this test timed out')
|
||||
|
@ -14,10 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import eventlet
|
||||
import eventlet.timeout
|
||||
import fixtures
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
@ -43,13 +40,6 @@ class TestAsyncProcess(base.BaseTestCase):
|
||||
output += new_output
|
||||
eventlet.sleep(0.01)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assert_max_execution_time(self, max_execution_time=5):
|
||||
with eventlet.timeout.Timeout(max_execution_time, False):
|
||||
yield
|
||||
return
|
||||
self.fail('Execution of this test timed out')
|
||||
|
||||
def test_stopping_async_process_lifecycle(self):
|
||||
with self.assert_max_execution_time():
|
||||
proc = async_process.AsyncProcess(['tail', '-f',
|
||||
|
150
neutron/tests/functional/agent/linux/test_ovsdb_monitor.py
Normal file
150
neutron/tests/functional/agent/linux/test_ovsdb_monitor.py
Normal file
@ -0,0 +1,150 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Tests in this module will be skipped unless:
|
||||
|
||||
- ovsdb-client is installed
|
||||
|
||||
- ovsdb-client can be invoked via password-less sudo
|
||||
|
||||
- OS_SUDO_TESTING is set to '1' or 'True' in the test execution
|
||||
environment
|
||||
|
||||
|
||||
The jenkins gate does not allow direct sudo invocation during test
|
||||
runs, but configuring OS_SUDO_TESTING ensures that developers are
|
||||
still able to execute tests that require the capability.
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
|
||||
import eventlet
|
||||
|
||||
from neutron.agent.linux import ovs_lib
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
def get_rand_name(name='test'):
|
||||
return name + str(random.randint(1, 0x7fffffff))
|
||||
|
||||
|
||||
def create_ovs_resource(name_prefix, creation_func):
|
||||
"""Create a new ovs resource that does not already exist.
|
||||
|
||||
:param name_prefix: The prefix for a randomly generated name
|
||||
:param creation_func: A function taking the name of the resource
|
||||
to be created. An error is assumed to indicate a name
|
||||
collision.
|
||||
"""
|
||||
while True:
|
||||
name = get_rand_name(name_prefix)
|
||||
try:
|
||||
return creation_func(name)
|
||||
except RuntimeError:
|
||||
continue
|
||||
break
|
||||
|
||||
|
||||
class BaseMonitorTest(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BaseMonitorTest, self).setUp()
|
||||
|
||||
self._check_test_requirements()
|
||||
|
||||
self.root_helper = 'sudo'
|
||||
self.ovs = ovs_lib.BaseOVS(self.root_helper)
|
||||
self.bridge = create_ovs_resource('test-br-', self.ovs.add_bridge)
|
||||
|
||||
def cleanup_bridge():
|
||||
self.bridge.destroy()
|
||||
self.addCleanup(cleanup_bridge)
|
||||
|
||||
def _check_command(self, cmd, error_text, skip_msg):
|
||||
try:
|
||||
utils.execute(cmd)
|
||||
except RuntimeError as e:
|
||||
if error_text in str(e):
|
||||
self.skipTest(skip_msg)
|
||||
raise
|
||||
|
||||
def _check_test_requirements(self):
|
||||
if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING:
|
||||
self.skipTest('testing with sudo is not enabled')
|
||||
self._check_command(['which', 'ovsdb-client'],
|
||||
'Exit code: 1',
|
||||
'ovsdb-client is not installed')
|
||||
self._check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'],
|
||||
'Exit code: 1',
|
||||
'password-less sudo not granted for ovsdb-client')
|
||||
|
||||
|
||||
class TestOvsdbMonitor(BaseMonitorTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestOvsdbMonitor, self).setUp()
|
||||
|
||||
self.monitor = ovsdb_monitor.OvsdbMonitor('Bridge',
|
||||
root_helper=self.root_helper)
|
||||
self.addCleanup(self.monitor.stop)
|
||||
self.monitor.start()
|
||||
|
||||
def collect_initial_output(self):
|
||||
while True:
|
||||
output = list(self.monitor.iter_stdout())
|
||||
if output:
|
||||
return output[0]
|
||||
eventlet.sleep(0.01)
|
||||
|
||||
def test_killed_monitor_respawns(self):
|
||||
with self.assert_max_execution_time():
|
||||
self.monitor.respawn_interval = 0
|
||||
old_pid = self.monitor._process.pid
|
||||
output1 = self.collect_initial_output()
|
||||
pid = self.monitor._get_pid_to_kill()
|
||||
self.monitor._reset_queues()
|
||||
self.monitor._kill_process(pid)
|
||||
while (self.monitor._process.pid == old_pid):
|
||||
eventlet.sleep(0.01)
|
||||
output2 = self.collect_initial_output()
|
||||
# Initial output should appear twice
|
||||
self.assertEqual(output1, output2)
|
||||
|
||||
|
||||
class TestSimpleInterfaceMonitor(BaseMonitorTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSimpleInterfaceMonitor, self).setUp()
|
||||
|
||||
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
root_helper=self.root_helper)
|
||||
self.addCleanup(self.monitor.stop)
|
||||
self.monitor.start(block=True)
|
||||
|
||||
def test_has_updates(self):
|
||||
self.assertTrue(self.monitor.has_updates,
|
||||
'Initial call should always be true')
|
||||
self.assertFalse(self.monitor.has_updates,
|
||||
'has_updates without port addition should be False')
|
||||
create_ovs_resource('test-port-', self.bridge.add_port)
|
||||
with self.assert_max_execution_time():
|
||||
# has_updates after port addition should become True
|
||||
while not self.monitor.has_updates:
|
||||
eventlet.sleep(0.01)
|
112
neutron/tests/unit/agent/linux/test_ovsdb_monitor.py
Normal file
112
neutron/tests/unit/agent/linux/test_ovsdb_monitor.py
Normal file
@ -0,0 +1,112 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet.event
|
||||
import mock
|
||||
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestOvsdbMonitor(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestOvsdbMonitor, self).setUp()
|
||||
self.root_helper = 'sudo'
|
||||
self.monitor = ovsdb_monitor.OvsdbMonitor('Interface',
|
||||
root_helper=self.root_helper)
|
||||
|
||||
def read_output_queues_and_returns_result(self, output_type, output):
|
||||
with mock.patch.object(self.monitor, '_process') as mock_process:
|
||||
with mock.patch.object(mock_process, output_type) as mock_file:
|
||||
with mock.patch.object(mock_file, 'readline') as mock_readline:
|
||||
mock_readline.return_value = output
|
||||
func = getattr(self.monitor,
|
||||
'_read_%s' % output_type,
|
||||
None)
|
||||
return func()
|
||||
|
||||
def test__read_stdout_returns_none_for_empty_read(self):
|
||||
result = self.read_output_queues_and_returns_result('stdout', '')
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test__read_stdout_queues_root_wrapper_errors_to_stderr_output(self):
|
||||
result = self.read_output_queues_and_returns_result('stdout',
|
||||
self.root_helper)
|
||||
self.assertIsNone(result)
|
||||
self.assertEqual(self.monitor._stderr_lines.get_nowait(),
|
||||
self.root_helper)
|
||||
|
||||
def test__read_stdout_queues_normal_output_to_stdout_queue(self):
|
||||
output = 'foo'
|
||||
result = self.read_output_queues_and_returns_result('stdout', output)
|
||||
self.assertEqual(result, output)
|
||||
self.assertEqual(self.monitor._stdout_lines.get_nowait(), output)
|
||||
|
||||
def test__read_stderr_returns_none(self):
|
||||
result = self.read_output_queues_and_returns_result('stderr', '')
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestSimpleInterfaceMonitor(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSimpleInterfaceMonitor, self).setUp()
|
||||
self.root_helper = 'sudo'
|
||||
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
root_helper=self.root_helper)
|
||||
|
||||
def test_is_active_is_false_by_default(self):
|
||||
self.assertFalse(self.monitor.is_active)
|
||||
|
||||
def test_is_active_can_be_true(self):
|
||||
self.monitor.data_received = True
|
||||
self.monitor._kill_event = eventlet.event.Event()
|
||||
self.assertTrue(self.monitor.is_active)
|
||||
|
||||
def test_has_updates_is_true_by_default(self):
|
||||
self.assertTrue(self.monitor.has_updates)
|
||||
|
||||
def test_has_updates_is_false_if_active_with_no_output(self):
|
||||
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
'.is_active')
|
||||
with mock.patch(target,
|
||||
new_callable=mock.PropertyMock(return_value=True)):
|
||||
self.assertFalse(self.monitor.has_updates)
|
||||
|
||||
def test__kill_sets_data_received_to_false(self):
|
||||
self.monitor.data_received = True
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._kill'):
|
||||
self.monitor._kill()
|
||||
self.assertFalse(self.monitor.data_received)
|
||||
|
||||
def test__read_stdout_sets_data_received_and_returns_output(self):
|
||||
output = 'foo'
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._read_stdout',
|
||||
return_value=output):
|
||||
result = self.monitor._read_stdout()
|
||||
self.assertTrue(self.monitor.data_received)
|
||||
self.assertEqual(result, output)
|
||||
|
||||
def test__read_stdout_does_not_set_data_received_for_empty_ouput(self):
|
||||
output = None
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._read_stdout',
|
||||
return_value=output):
|
||||
self.monitor._read_stdout()
|
||||
self.assertFalse(self.monitor.data_received)
|
116
neutron/tests/unit/agent/linux/test_polling.py
Normal file
116
neutron/tests/unit/agent/linux/test_polling.py
Normal file
@ -0,0 +1,116 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.agent.linux import polling
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestGetPollingManager(base.BaseTestCase):
|
||||
|
||||
def test_return_always_poll_by_default(self):
|
||||
with polling.get_polling_manager() as pm:
|
||||
self.assertEqual(pm.__class__, polling.AlwaysPoll)
|
||||
|
||||
def test_manage_polling_minimizer(self):
|
||||
mock_target = 'neutron.agent.linux.polling.InterfacePollingMinimizer'
|
||||
with mock.patch('%s.start' % mock_target) as mock_start:
|
||||
with mock.patch('%s.stop' % mock_target) as mock_stop:
|
||||
with polling.get_polling_manager(minimize_polling=True,
|
||||
root_helper='test') as pm:
|
||||
self.assertEqual(pm._monitor.root_helper, 'test')
|
||||
self.assertEqual(pm.__class__,
|
||||
polling.InterfacePollingMinimizer)
|
||||
mock_stop.assert_has_calls(mock.call())
|
||||
mock_start.assert_has_calls(mock.call())
|
||||
|
||||
|
||||
class TestBasePollingManager(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestBasePollingManager, self).setUp()
|
||||
self.pm = polling.BasePollingManager()
|
||||
|
||||
def test_force_polling_sets_interval_attribute(self):
|
||||
self.assertFalse(self.pm._force_polling)
|
||||
self.pm.force_polling()
|
||||
self.assertTrue(self.pm._force_polling)
|
||||
|
||||
def test_polling_completed_sets_interval_attribute(self):
|
||||
self.pm._polling_completed = False
|
||||
self.pm.polling_completed()
|
||||
self.assertTrue(self.pm._polling_completed)
|
||||
|
||||
def mock_is_polling_required(self, return_value):
|
||||
return mock.patch.object(self.pm, '_is_polling_required',
|
||||
return_value=return_value)
|
||||
|
||||
def test_is_polling_required_returns_true_when_forced(self):
|
||||
with self.mock_is_polling_required(False):
|
||||
self.pm.force_polling()
|
||||
self.assertTrue(self.pm.is_polling_required)
|
||||
self.assertFalse(self.pm._force_polling)
|
||||
|
||||
def test_is_polling_required_returns_true_when_polling_not_completed(self):
|
||||
with self.mock_is_polling_required(False):
|
||||
self.pm._polling_completed = False
|
||||
self.assertTrue(self.pm.is_polling_required)
|
||||
|
||||
def test_is_polling_required_returns_true_when_updates_are_present(self):
|
||||
with self.mock_is_polling_required(True):
|
||||
self.assertTrue(self.pm.is_polling_required)
|
||||
self.assertFalse(self.pm._polling_completed)
|
||||
|
||||
def test_is_polling_required_returns_false_for_no_updates(self):
|
||||
with self.mock_is_polling_required(False):
|
||||
self.assertFalse(self.pm.is_polling_required)
|
||||
|
||||
|
||||
class TestAlwaysPoll(base.BaseTestCase):
|
||||
|
||||
def test_is_polling_required_always_returns_true(self):
|
||||
pm = polling.AlwaysPoll()
|
||||
self.assertTrue(pm.is_polling_required)
|
||||
|
||||
|
||||
class TestInterfacePollingMinimizer(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestInterfacePollingMinimizer, self).setUp()
|
||||
self.pm = polling.InterfacePollingMinimizer()
|
||||
|
||||
def test_start_calls_monitor_start(self):
|
||||
with mock.patch.object(self.pm._monitor, 'start') as mock_start:
|
||||
self.pm.start()
|
||||
mock_start.assert_called_with()
|
||||
|
||||
def test_stop_calls_monitor_stop(self):
|
||||
with mock.patch.object(self.pm._monitor, 'stop') as mock_stop:
|
||||
self.pm.stop()
|
||||
mock_stop.assert_called_with()
|
||||
|
||||
def mock_has_updates(self, return_value):
|
||||
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
'.has_updates')
|
||||
return mock.patch(
|
||||
target,
|
||||
new_callable=mock.PropertyMock(return_value=return_value),
|
||||
)
|
||||
|
||||
def test__is_polling_required_returns_when_updates_are_present(self):
|
||||
with self.mock_has_updates(True):
|
||||
self.assertTrue(self.pm._is_polling_required())
|
@ -583,6 +583,14 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
self.agent.reclaim_local_vlan('net2')
|
||||
del_port_fn.assert_called_once_with('gre-ip_agent_2')
|
||||
|
||||
def test_daemon_loop_uses_polling_manager(self):
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.polling.get_polling_manager') as mock_get_pm:
|
||||
with mock.patch.object(self.agent, 'rpc_loop') as mock_loop:
|
||||
self.agent.daemon_loop()
|
||||
mock_get_pm.assert_called_with(False, 'sudo')
|
||||
mock_loop.called_once()
|
||||
|
||||
|
||||
class AncillaryBridgesTest(base.BaseTestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user