Minor refactoring of auto_schedule_routers

The method is more complicated than it needs to be, and it makes it difficult
to target fixes for it.

This is done in preparation of fix for DB lock timeout errors observed while
dealing with DVR routers.

Test coverage is already provided, and more granular coverage is added to
reflect the new structure being introduced.

Partial-bug: #1356121

Change-Id: Ifb7a742b64139f3a5d9b88c3c6261b1b890946f9
This commit is contained in:
armando-migliaccio 2014-08-15 15:55:21 -07:00
parent f3cd90bf69
commit e23cc25ea9
4 changed files with 257 additions and 68 deletions

View File

@ -82,6 +82,23 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
raise ext_agent.AgentNotFound(id=id) raise ext_agent.AgentNotFound(id=id)
return agent return agent
def get_enabled_agent_on_host(self, context, agent_type, host):
"""Return agent of agent_type for the specified host."""
query = context.session.query(Agent)
query = query.filter(Agent.agent_type == agent_type,
Agent.host == host,
Agent.admin_state_up == sql.true())
try:
agent = query.one()
except exc.NoResultFound:
LOG.debug('No enabled %(agent_type)s agent on host '
'%(host)s' % {'agent_type': agent_type, 'host': host})
return
if self.is_agent_down(agent.heartbeat_timestamp):
LOG.warn(_('%(agent_type)s agent %(agent_id)s is not active')
% {'agent_type': agent_type, 'agent_id': agent.id})
return agent
@classmethod @classmethod
def is_agent_down(cls, heart_beat_time): def is_agent_down(cls, heart_beat_time):
return timeutils.is_older_than(heart_beat_time, return timeutils.is_older_than(heart_beat_time,

View File

@ -18,11 +18,9 @@ import random
from oslo.db import exception as db_exc from oslo.db import exception as db_exc
import six import six
from sqlalchemy.orm import exc
from sqlalchemy import sql from sqlalchemy import sql
from neutron.common import constants from neutron.common import constants
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db from neutron.db import l3_db
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -52,88 +50,91 @@ class L3Scheduler(object):
return query.count() > 0 return query.count() > 0
def filter_unscheduled_routers(self, context, plugin, routers):
"""Filter from list of routers the ones that are not scheduled."""
unscheduled_routers = []
for router in routers:
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [router['id']], admin_state_up=True)
# TODO(armando-migliaccio): remove dvr-related check
if l3_agents and not router.get('distributed', False):
LOG.debug(('Router %(router_id)s has already been '
'hosted by L3 agent %(agent_id)s'),
{'router_id': router['id'],
'agent_id': l3_agents[0]['id']})
else:
unscheduled_routers.append(router)
return unscheduled_routers
def get_unscheduled_routers(self, context, plugin):
"""Get routers with no agent binding."""
# TODO(gongysh) consider the disabled agent's router
no_agent_binding = ~sql.exists().where(
l3_db.Router.id ==
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
query = context.session.query(l3_db.Router.id).filter(no_agent_binding)
unscheduled_router_ids = [router_id_[0] for router_id_ in query]
if unscheduled_router_ids:
return plugin.get_routers(
context, filters={'id': unscheduled_router_ids})
return []
def get_routers_to_schedule(self, context, plugin, router_ids=None):
"""Verify that the routers specified need to be scheduled.
:param context: the context
:param plugin: the core plugin
:param router_ids: the list of routers to be checked for scheduling
:returns: the list of routers to be scheduled
"""
if router_ids is not None:
routers = plugin.get_routers(context, filters={'id': router_ids})
return self.filter_unscheduled_routers(context, plugin, routers)
else:
return self.get_unscheduled_routers(context, plugin)
def get_routers_can_schedule(self, context, plugin, routers, l3_agent):
"""Get the subset of routers that can be scheduled on the L3 agent."""
ids_to_discard = set()
for router in routers:
# check if the l3 agent is compatible with the router
candidates = plugin.get_l3_agent_candidates(
context, router, [l3_agent])
if not candidates:
ids_to_discard.add(router['id'])
return [r for r in routers if r['id'] not in ids_to_discard]
def auto_schedule_routers(self, plugin, context, host, router_ids): def auto_schedule_routers(self, plugin, context, host, router_ids):
"""Schedule non-hosted routers to L3 Agent running on host. """Schedule non-hosted routers to L3 Agent running on host.
If router_ids is given, each router in router_ids is scheduled If router_ids is given, each router in router_ids is scheduled
if it is not scheduled yet. Otherwise all unscheduled routers if it is not scheduled yet. Otherwise all unscheduled routers
are scheduled. are scheduled.
Don't schedule the routers which are hosted already Do not schedule the routers which are hosted already
by active l3 agents. by active l3 agents.
:returns: True if routers have been successfully assigned to host
""" """
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
# query if we have valid l3 agent on the host l3_agent = plugin.get_enabled_agent_on_host(
query = context.session.query(agents_db.Agent) context, constants.AGENT_TYPE_L3, host)
query = query.filter(agents_db.Agent.agent_type == if not l3_agent:
constants.AGENT_TYPE_L3,
agents_db.Agent.host == host,
agents_db.Agent.admin_state_up == sql.true())
try:
l3_agent = query.one()
except (exc.MultipleResultsFound, exc.NoResultFound):
LOG.debug(_('No enabled L3 agent on host %s'),
host)
return False return False
if agents_db.AgentDbMixin.is_agent_down(
l3_agent.heartbeat_timestamp): unscheduled_routers = self.get_routers_to_schedule(
LOG.warn(_('L3 agent %s is not active'), l3_agent.id) context, plugin, router_ids)
# check if each of the specified routers is hosted
if router_ids:
routers = plugin.get_routers(
context, filters={'id': router_ids})
unscheduled_routers = []
for router in routers:
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [router['id']], admin_state_up=True)
if l3_agents and not router.get('distributed', False):
LOG.debug(_('Router %(router_id)s has already been'
' hosted by L3 agent %(agent_id)s'),
{'router_id': router['id'],
'agent_id': l3_agents[0]['id']})
else:
unscheduled_routers.append(router)
if not unscheduled_routers: if not unscheduled_routers:
# all (specified) routers are already scheduled
return False return False
else:
# get all routers that are not hosted
#TODO(gongysh) consider the disabled agent's router
stmt = ~sql.exists().where(
l3_db.Router.id ==
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
unscheduled_router_ids = [router_id_[0] for router_id_ in
context.session.query(
l3_db.Router.id).filter(stmt)]
if not unscheduled_router_ids:
LOG.debug(_('No non-hosted routers'))
return False
unscheduled_routers = plugin.get_routers(
context, filters={'id': unscheduled_router_ids})
# check if the configuration of l3 agent is compatible target_routers = self.get_routers_can_schedule(
# with the router context, plugin, unscheduled_routers, l3_agent)
to_removed_ids = set()
for router in unscheduled_routers:
candidates = plugin.get_l3_agent_candidates(context,
router,
[l3_agent])
if not candidates:
to_removed_ids.add(router['id'])
target_routers = [r for r in unscheduled_routers
if r['id'] not in to_removed_ids]
if not target_routers: if not target_routers:
LOG.warn(_('No routers compatible with L3 agent configuration' LOG.warn(_('No routers compatible with L3 agent configuration'
' on host %s'), host) ' on host %s'), host)
return False return False
for router_dict in target_routers: self.bind_routers(context, target_routers, l3_agent)
if (router_dict.get('distributed', False)
and self.dvr_has_binding(context,
router_dict['id'],
l3_agent.id)):
continue
self.bind_router(context, router_dict['id'], l3_agent)
return True return True
def get_candidates(self, plugin, context, sync_router, subnet_id): def get_candidates(self, plugin, context, sync_router, subnet_id):
@ -173,6 +174,13 @@ class L3Scheduler(object):
return candidates return candidates
def bind_routers(self, context, routers, l3_agent):
for router in routers:
if (router.get('distributed', False) and
self.dvr_has_binding(context, router['id'], l3_agent.id)):
continue
self.bind_router(context, router['id'], l3_agent)
def bind_router(self, context, router_id, chosen_agent): def bind_router(self, context, router_id, chosen_agent):
"""Bind the router to the l3 agent which has been chosen.""" """Bind the router to the l3 agent which has been chosen."""
try: try:

View File

@ -16,9 +16,11 @@
import mock import mock
from oslo.db import exception as exc from oslo.db import exception as exc
from neutron.common import constants
from neutron import context from neutron import context
from neutron.db import agents_db from neutron.db import agents_db
from neutron.db import db_base_plugin_v2 as base_plugin from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.openstack.common import timeutils
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
@ -40,6 +42,35 @@ class TestAgentsDbMixin(testlib_api.SqlTestCase):
'topic': 'N/A' 'topic': 'N/A'
} }
def _add_agent(self, agent_id, agent_type, agent_host):
with self.context.session.begin(subtransactions=True):
now = timeutils.utcnow()
agent = agents_db.Agent(id=agent_id,
agent_type=agent_type,
binary='foo_binary',
topic='foo_topic',
host=agent_host,
created_at=now,
started_at=now,
admin_state_up=True,
heartbeat_timestamp=now,
configurations='')
self.context.session.add(agent)
return agent
def test_get_enabled_agent_on_host_found(self):
agent = self._add_agent('foo_id', constants.AGENT_TYPE_L3, 'foo_host')
expected = self.plugin.get_enabled_agent_on_host(
self.context, constants.AGENT_TYPE_L3, 'foo_host')
self.assertEqual(expected, agent)
def test_get_enabled_agent_on_host_not_found(self):
with mock.patch.object(agents_db.LOG, 'debug') as mock_log:
agent = self.plugin.get_enabled_agent_on_host(
self.context, constants.AGENT_TYPE_L3, 'foo_agent')
self.assertIsNone(agent)
self.assertTrue(mock_log.called)
def _assert_ref_fields_are_equal(self, reference, result): def _assert_ref_fields_are_equal(self, reference, result):
"""Compare (key, value) pairs of a reference dict with the result """Compare (key, value) pairs of a reference dict with the result

View File

@ -35,6 +35,7 @@ from neutron.extensions import l3 as ext_l3
from neutron import manager from neutron import manager
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
from neutron.scheduler import l3_agent_scheduler from neutron.scheduler import l3_agent_scheduler
from neutron.tests import base
from neutron.tests.unit import test_db_plugin from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
@ -83,6 +84,138 @@ DB_PLUGIN_KLASS = ('neutron.plugins.openvswitch.ovs_neutron_plugin.'
'OVSNeutronPluginV2') 'OVSNeutronPluginV2')
class FakeL3Scheduler(l3_agent_scheduler.L3Scheduler):
def schedule(self):
pass
def _choose_router_agent(self):
pass
class L3SchedulerBaseTestCase(base.BaseTestCase):
def setUp(self):
super(L3SchedulerBaseTestCase, self).setUp()
self.scheduler = FakeL3Scheduler()
self.plugin = mock.Mock()
self.context = q_context.get_admin_context()
def test_auto_schedule_routers(self):
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
with contextlib.nested(
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
gs, gr):
result = self.scheduler.auto_schedule_routers(
self.plugin, self.context, mock.ANY, mock.ANY)
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
self.assertTrue(result)
self.assertTrue(gs.called)
self.assertTrue(gr.called)
def test_auto_schedule_routers_no_agents(self):
self.plugin.get_enabled_agent_on_host.return_value = None
result = self.scheduler.auto_schedule_routers(
self.plugin, self.context, mock.ANY, mock.ANY)
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
self.assertFalse(result)
def test_auto_schedule_routers_no_unscheduled_routers(self):
with mock.patch.object(self.scheduler,
'get_routers_to_schedule') as mock_routers:
mock_routers.return_value = None
result = self.scheduler.auto_schedule_routers(
self.plugin, self.context, mock.ANY, mock.ANY)
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
self.assertFalse(result)
def test_auto_schedule_routers_no_target_routers(self):
self.plugin.get_enabled_agent_on_host.return_value = [mock.ANY]
with contextlib.nested(
mock.patch.object(self.scheduler, 'get_routers_to_schedule'),
mock.patch.object(self.scheduler, 'get_routers_can_schedule')) as (
mock_unscheduled_routers, mock_target_routers):
mock_unscheduled_routers.return_value = mock.ANY
mock_target_routers.return_value = None
result = self.scheduler.auto_schedule_routers(
self.plugin, self.context, mock.ANY, mock.ANY)
self.assertTrue(self.plugin.get_enabled_agent_on_host.called)
self.assertFalse(result)
def test_get_routers_to_schedule_with_router_ids(self):
router_ids = ['foo_router_1', 'foo_router_2']
expected_routers = [
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
]
self.plugin.get_routers.return_value = expected_routers
with mock.patch.object(self.scheduler,
'filter_unscheduled_routers') as mock_filter:
mock_filter.return_value = expected_routers
unscheduled_routers = self.scheduler.get_routers_to_schedule(
mock.ANY, self.plugin, router_ids)
mock_filter.assert_called_once_with(
mock.ANY, self.plugin, expected_routers)
self.assertEqual(expected_routers, unscheduled_routers)
def test_get_routers_to_schedule_without_router_ids(self):
expected_routers = [
{'id': 'foo_router1'}, {'id': 'foo_router_2'}
]
with mock.patch.object(self.scheduler,
'get_unscheduled_routers') as mock_get:
mock_get.return_value = expected_routers
unscheduled_routers = self.scheduler.get_routers_to_schedule(
mock.ANY, self.plugin)
mock_get.assert_called_once_with(mock.ANY, self.plugin)
self.assertEqual(expected_routers, unscheduled_routers)
def _test_get_routers_can_schedule(self, routers, agent, target_routers):
self.plugin.get_l3_agent_candidates.return_value = agent
result = self.scheduler.get_routers_can_schedule(
mock.ANY, self.plugin, routers, mock.ANY)
self.assertEqual(target_routers, result)
def _test_filter_unscheduled_routers(self, routers, agents, expected):
self.plugin.get_l3_agents_hosting_routers.return_value = agents
unscheduled_routers = self.scheduler.filter_unscheduled_routers(
mock.ANY, self.plugin, routers)
self.assertEqual(expected, unscheduled_routers)
def test_filter_unscheduled_routers_already_scheduled(self):
self._test_filter_unscheduled_routers(
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
[{'id': 'foo_agent_id'}], [])
def test_filter_unscheduled_routers_non_scheduled(self):
self._test_filter_unscheduled_routers(
[{'id': 'foo_router1'}, {'id': 'foo_router_2'}],
None, [{'id': 'foo_router1'}, {'id': 'foo_router_2'}])
def test_get_routers_can_schedule_with_compat_agent(self):
routers = [{'id': 'foo_router'}]
self._test_get_routers_can_schedule(routers, mock.ANY, routers)
def test_get_routers_can_schedule_with_no_compat_agent(self):
routers = [{'id': 'foo_router'}]
self._test_get_routers_can_schedule(routers, None, [])
def test_bind_routers_centralized(self):
routers = [{'id': 'foo_router'}]
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
self.scheduler.bind_routers(mock.ANY, routers, mock.ANY)
mock_bind.assert_called_once_with(mock.ANY, 'foo_router', mock.ANY)
def test_bind_routers_dvr(self):
routers = [{'id': 'foo_router', 'distributed': True}]
agent = agents_db.Agent(id='foo_agent')
with mock.patch.object(self.scheduler, 'dvr_has_binding') as mock_dvr:
with mock.patch.object(self.scheduler, 'bind_router') as mock_bind:
self.scheduler.bind_routers(mock.ANY, routers, agent)
mock_dvr.assert_called_once_with(mock.ANY, 'foo_router', 'foo_agent')
self.assertFalse(mock_bind.called)
class L3SchedulerTestExtensionManager(object): class L3SchedulerTestExtensionManager(object):
def get_resources(self): def get_resources(self):