Add L3 Scheduler Changes for Distributed Routers

This patch implements the L3 Scheduler changes for the
Distributed Virtual Routers.

Partially-implements: blueprint neutron-ovs-dvr

Change-Id: I407c3d639ebdf885b1418bceac7cfc251e7eba1f
Co-Authored-By: Carl Baldwin <carl.baldwin@hp.com>
Co-Authored-By: Armando Migliaccio <armamig@gmail.com>
This commit is contained in:
Murali Birru 2014-04-22 13:45:31 -07:00 committed by armando-migliaccio
parent 4f3778f325
commit 77409d9adc
17 changed files with 806 additions and 95 deletions

View File

@ -66,6 +66,32 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
topic='%s.%s' % (l3_agent.topic, l3_agent.host),
version='1.1')
def _agent_notification_arp(self, context, method, router_id,
operation, data):
"""Notify arp details to l3 agents hosting router."""
if not router_id:
return
adminContext = (context.is_admin and
context or context.elevated())
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
l3_agents = (plugin.
get_l3_agents_hosting_routers(adminContext,
[router_id],
admin_state_up=True,
active=True))
# TODO(murali): replace cast with fanout to avoid performance
# issues at greater scale.
for l3_agent in l3_agents:
topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
LOG.debug('Casting message %(method)s with topic %(topic)s',
{'topic': topic, 'method': method})
dvr_arptable = {'router_id': router_id,
'arp_table': data}
self.cast(context,
self.make_msg(method, payload=dvr_arptable),
topic=topic, version='1.2')
def _notification(self, context, method, router_ids, operation, data):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_service_plugins().get(
@ -78,7 +104,7 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
adminContext = (context.is_admin and
context or context.elevated())
plugin.schedule_routers(adminContext, router_ids)
plugin.schedule_routers(adminContext, router_ids, hints=data)
self._agent_notification(
context, method, router_ids, operation, data)
else:
@ -112,6 +138,14 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
self._notification(context, 'routers_updated', router_ids,
operation, data)
def add_arp_entry(self, context, router_id, arp_table, operation=None):
self._agent_notification_arp(context, 'add_arp_entry', router_id,
operation, arp_table)
def del_arp_entry(self, context, router_id, arp_table, operation=None):
self._agent_notification_arp(context, 'del_arp_entry', router_id,
operation, arp_table)
def router_removed_from_agent(self, context, router_id, host):
self._notification_host(context, 'router_removed_from_agent',
{'router_id': router_id}, host)

View File

@ -25,7 +25,7 @@ from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import model_base
from neutron.extensions import l3agentscheduler
from neutron import manager
L3_AGENTS_SCHEDULER_OPTS = [
cfg.StrOpt('router_scheduler_driver',
@ -62,13 +62,32 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
def add_router_to_l3_agent(self, context, agent_id, router_id):
"""Add a l3 agent to host a router."""
router = self.get_router(context, router_id)
distributed = router.get('distributed')
with context.session.begin(subtransactions=True):
agent_db = self._get_agent(context, agent_id)
agent_conf = self.get_configuration_dict(agent_db)
agent_mode = agent_conf.get('agent_mode', 'legacy')
if (not distributed and agent_mode == 'dvr' or
distributed and agent_mode == 'legacy'):
router_type = ('distributed' if distributed else 'centralized')
raise l3agentscheduler.RouterL3AgentMismatch(
router_type=router_type, router_id=router_id,
agent_mode=agent_mode, agent_id=agent_id)
if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
not agent_db['admin_state_up'] or
not self.get_l3_agent_candidates(router, [agent_db])):
not self.get_l3_agent_candidates(context,
router,
[agent_db])):
raise l3agentscheduler.InvalidL3Agent(id=agent_id)
query = context.session.query(RouterL3AgentBinding)
if distributed:
binding = query.filter_by(router_id=router_id,
l3_agent_id=agent_id).first()
if binding:
raise l3agentscheduler.RouterHostedByL3Agent(
router_id=router_id,
agent_id=binding.l3_agent_id)
else:
try:
binding = query.filter_by(router_id=router_id).one()
@ -238,7 +257,57 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
if agentschedulers_db.AgentSchedulerDbMixin.is_eligible_agent(
active, l3_agent)]
def get_l3_agent_candidates(self, sync_router, l3_agents):
def check_vmexists_on_l3agent(self, context, l3_agent, router_id,
subnet_id):
if not subnet_id:
return True
core_plugin = manager.NeutronManager.get_plugin()
filter = {'fixed_ips': {'subnet_id': [subnet_id]}}
ports = core_plugin.get_ports(context, filters=filter)
for port in ports:
if ("compute:" in port['device_owner'] and
l3_agent['host'] == port['binding:host_id']):
return True
return False
def get_snat_candidates(self, sync_router, l3_agents):
"""Get the valid snat enabled l3 agents for the distributed router."""
candidates = []
is_router_distributed = sync_router.get('distributed', False)
if not is_router_distributed:
return candidates
for l3_agent in l3_agents:
if not l3_agent.admin_state_up:
continue
agent_conf = self.get_configuration_dict(l3_agent)
agent_mode = agent_conf.get('agent_mode', 'legacy')
if agent_mode != 'dvr_snat':
continue
router_id = agent_conf.get('router_id', None)
use_namespaces = agent_conf.get('use_namespaces', True)
if not use_namespaces and router_id != sync_router['id']:
continue
handle_internal_only_routers = agent_conf.get(
'handle_internal_only_routers', True)
gateway_external_network_id = agent_conf.get(
'gateway_external_network_id', None)
ex_net_id = (sync_router['external_gateway_info'] or {}).get(
'network_id')
if ((not ex_net_id and not handle_internal_only_routers) or
(ex_net_id and gateway_external_network_id and
ex_net_id != gateway_external_network_id)):
continue
candidates.append(l3_agent)
return candidates
def get_l3_agent_candidates(self, context, sync_router, l3_agents,
subnet_id=None):
"""Get the valid l3 agents for the router from a list of l3_agents."""
candidates = []
for l3_agent in l3_agents:
@ -251,6 +320,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
'handle_internal_only_routers', True)
gateway_external_network_id = agent_conf.get(
'gateway_external_network_id', None)
agent_mode = agent_conf.get('agent_mode', 'legacy')
if not use_namespaces and router_id != sync_router['id']:
continue
ex_net_id = (sync_router['external_gateway_info'] or {}).get(
@ -259,6 +329,12 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
(ex_net_id and gateway_external_network_id and
ex_net_id != gateway_external_network_id)):
continue
is_router_distributed = sync_router.get('distributed', False)
if not is_router_distributed and agent_mode == 'legacy':
candidates.append(l3_agent)
elif (agent_mode.startswith('dvr') and
self.check_vmexists_on_l3agent(
context, l3_agent, sync_router['id'], subnet_id)):
candidates.append(l3_agent)
return candidates
@ -267,15 +343,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
return self.router_scheduler.auto_schedule_routers(
self, context, host, router_ids)
def schedule_router(self, context, router, candidates=None):
def schedule_router(self, context, router, candidates=None, hints=None):
if self.router_scheduler:
return self.router_scheduler.schedule(
self, context, router, candidates)
self, context, router, candidates=candidates, hints=hints)
def schedule_routers(self, context, routers):
def schedule_routers(self, context, routers, hints=None):
"""Schedule the routers to l3 agents."""
for router in routers:
self.schedule_router(context, router)
self.schedule_router(context, router, candidates=None, hints=hints)
def get_l3_agent_with_min_routers(self, context, agent_ids):
"""Return l3 agent with the least number of routers."""

View File

@ -166,17 +166,20 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
# l3 agent (associated with given external network);
# do check before update in DB as an exception will be raised
# in case no proper l3 agent found
candidates = None
if gw_info != attributes.ATTR_NOT_SPECIFIED:
candidates = self._check_router_needs_rescheduling(
context, id, gw_info)
payload = {'gw_exists': True}
else:
candidates = None
payload = {'gw_exists': False}
router_db = self._update_router_db(context, id, r, gw_info)
if candidates:
l3_plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
l3_plugin.reschedule_router(context, id, candidates)
self.l3_rpc_notifier.routers_updated(context, [router_db['id']])
self.l3_rpc_notifier.routers_updated(context, [router_db['id']],
None, payload)
return self._make_router_dict(router_db)
def _check_router_needs_rescheduling(self, context, router_id, gw_info):
@ -234,8 +237,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'id': router_id,
'external_gateway_info': {'network_id': network_id}
}
candidates = l3_plugin.get_l3_agent_candidates(
router, active_agents)
candidates = l3_plugin.get_l3_agent_candidates(context,
router,
active_agents)
if not candidates:
msg = (_('No eligible l3 agent associated with external network '
'%s found') % network_id)
@ -468,7 +472,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
def notify_router_interface_action(
self, context, router_id, tenant_id, port_id, subnet_id, action):
l3_method = '%s_router_interface' % action
self.l3_rpc_notifier.routers_updated(context, [router_id], l3_method)
self.l3_rpc_notifier.routers_updated(context, [router_id],
l3_method, {'subnet_id': subnet_id})
mapping = {'add': 'create', 'remove': 'delete'}
info = {
@ -784,7 +789,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
if router_id:
self.l3_rpc_notifier.routers_updated(
context, [router_id],
'create_floatingip')
'create_floatingip', {})
return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip):
@ -806,7 +811,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router_ids.append(router_id)
if router_ids:
self.l3_rpc_notifier.routers_updated(
context, router_ids, 'update_floatingip')
context, router_ids, 'update_floatingip', {})
return self._make_floatingip_dict(floatingip_db)
def update_floatingip_status(self, context, floatingip_id, status):
@ -826,7 +831,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
if router_id:
self.l3_rpc_notifier.routers_updated(
context, [router_id],
'delete_floatingip')
'delete_floatingip', {})
def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id)
@ -915,15 +920,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
if router_ids:
self.l3_rpc_notifier.routers_updated(
context, list(router_ids),
'disassociate_floatingips')
'disassociate_floatingips', {})
def _build_routers_list(self, routers, gw_ports):
gw_port_id_gw_port_dict = dict((gw_port['id'], gw_port)
for gw_port in gw_ports)
def _build_routers_list(self, context, routers, gw_ports):
for router in routers:
gw_port_id = router['gw_port_id']
if gw_port_id:
router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
router['gw_port'] = gw_ports[gw_port_id]
return routers
def _get_sync_routers(self, context, router_ids=None, active=None):
@ -952,8 +955,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
gw_port_ids.append(gw_port_id)
gw_ports = []
if gw_port_ids:
gw_ports = self.get_sync_gw_ports(context, gw_port_ids)
return self._build_routers_list(router_dicts, gw_ports)
gw_ports = dict((gw_port['id'], gw_port)
for gw_port in
self.get_sync_gw_ports(context, gw_port_ids))
return self._build_routers_list(context, router_dicts, gw_ports)
def _get_sync_floating_ips(self, context, router_ids):
"""Query floating_ips that relate to list of router_ids."""

View File

@ -19,7 +19,9 @@ from neutron.common import constants as l3_const
from neutron.common import exceptions as n_exc
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_dvrscheduler_db as l3_dvrsched_db
from neutron.db import models_v2
from neutron.extensions import l3
from neutron.extensions import portbindings
from neutron.openstack.common import log as logging
@ -219,6 +221,32 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
self._populate_subnet_for_ports(context, interfaces)
return interfaces
def _build_routers_list(self, context, routers, gw_ports):
# Perform a single query up front for all routers
router_ids = [r['id'] for r in routers]
snat_binding = l3_dvrsched_db.CentralizedSnatL3AgentBinding
query = (context.session.query(snat_binding).
filter(snat_binding.router_id.in_(router_ids))).all()
bindings = dict((b.router_id, b) for b in query)
for rtr in routers:
gw_port_id = rtr['gw_port_id']
if gw_port_id:
rtr['gw_port'] = gw_ports[gw_port_id]
if 'enable_snat' in rtr[l3.EXTERNAL_GW_INFO]:
rtr['enable_snat'] = (
rtr[l3.EXTERNAL_GW_INFO]['enable_snat'])
binding = bindings.get(rtr['id'])
if not binding:
rtr['gw_port_host'] = None
LOG.debug('No snat is bound to router %s', rtr['id'])
continue
rtr['gw_port_host'] = binding.l3_agent.host
return routers
def _process_routers(self, context, routers):
routers_dict = {}
for router in routers:

View File

@ -0,0 +1,277 @@
# (c) Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron.common import constants as q_const
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
from neutron.db import model_base
from neutron.db import models_v2
from neutron.openstack.common import log as logging
from neutron.plugins.ml2 import db as ml2_db
LOG = logging.getLogger(__name__)
class CentralizedSnatL3AgentBinding(model_base.BASEV2):
"""Represents binding between Neutron Centralized SNAT and L3 agents."""
__tablename__ = "csnat_l3_agent_bindings"
router_id = sa.Column(sa.String(36),
sa.ForeignKey("routers.id", ondelete='CASCADE'),
primary_key=True)
l3_agent_id = sa.Column(sa.String(36),
sa.ForeignKey("agents.id", ondelete='CASCADE'),
nullable=False)
host_id = sa.Column(sa.String(255))
csnat_gw_port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
l3_agent = orm.relationship(agents_db.Agent)
csnat_gw_port = orm.relationship(models_v2.Port)
class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
"""Mixin class for L3 DVR scheduler."""
def dvr_update_router_addvm(self, context, port):
ips = port['fixed_ips']
for ip in ips:
subnet = ip['subnet_id']
filter_sub = {'fixed_ips': {'subnet_id': [subnet]},
'device_owner':
[q_const.DEVICE_OWNER_DVR_INTERFACE]}
router_id = None
ports = self._core_plugin.get_ports(context, filters=filter_sub)
for port in ports:
router_id = port['device_id']
router_dict = self.get_router(context, router_id)
if router_dict.get('distributed', False):
payload = {'subnet_id': subnet}
self.l3_rpc_notifier.routers_updated(
context, [router_id], None, payload)
break
LOG.debug('DVR: dvr_update_router_addvm %s ', router_id)
def get_dvr_routers_by_vmportid(self, context, port_id):
"""Gets the dvr routers on vmport subnets."""
router_ids = set()
port_dict = self._core_plugin.get_port(context, port_id)
fixed_ips = port_dict['fixed_ips']
for fixedip in fixed_ips:
vm_subnet = fixedip['subnet_id']
filter_sub = {'fixed_ips': {'subnet_id': [vm_subnet]},
'device_owner':
[q_const.DEVICE_OWNER_DVR_INTERFACE]}
subnet_ports = self._core_plugin.get_ports(
context, filters=filter_sub)
for subnet_port in subnet_ports:
router_ids.add(subnet_port['device_id'])
return router_ids
def get_subnet_ids_on_router(self, context, router_id):
"""Return subnet IDs for interfaces attached to the given router."""
subnet_ids = set()
filter_rtr = {'device_id': [router_id]}
int_ports = self._core_plugin.get_ports(context, filters=filter_rtr)
for int_port in int_ports:
int_ips = int_port['fixed_ips']
int_subnet = int_ips[0]['subnet_id']
subnet_ids.add(int_subnet)
return subnet_ids
def check_vm_exists_on_subnet(self, context, host, port_id, subnet_id):
"""Check if there is any vm exists on the subnet_id."""
filter_sub = {'fixed_ips': {'subnet_id': [subnet_id]}}
ports = self._core_plugin.get_ports(context, filters=filter_sub)
for port in ports:
if ("compute:" in port['device_owner']
and port['status'] == 'ACTIVE'
and port['binding:host_id'] == host
and port['id'] != port_id):
LOG.debug('DVR: VM exists for subnet %(subnet_id)s on host '
'%(host)s', {'subnet_id': subnet_id,
'host': host})
return True
return False
def delete_namespace_on_host(self, context, host, router_id):
"""Delete the given router namespace on the host."""
agent = self._get_agent_by_type_and_host(
context, q_const.AGENT_TYPE_L3, host)
agent_id = str(agent.id)
with context.session.begin(subtransactions=True):
(context.session.query(l3agent_sch_db.RouterL3AgentBinding).
filter_by(router_id=router_id, l3_agent_id=agent_id).
delete(synchronize_session=False))
LOG.debug('Deleted router %(router_id)s on agent.id %(id)s',
{'router_id': router_id,
'id': agent.id})
def dvr_deletens_if_no_vm(self, context, port_id):
"""Delete the DVR namespace if no VM exists."""
router_ids = self.get_dvr_routers_by_vmportid(context, port_id)
port_host = ml2_db.get_port_binding_host(port_id)
if not router_ids:
LOG.debug('No namespaces available for this DVR port %(port)s '
'on host %(host)s', {'port': port_id,
'host': port_host})
return []
removed_router_info = []
for router_id in router_ids:
subnet_ids = self.get_subnet_ids_on_router(context, router_id)
vm_exists_on_subnet = False
for subnet in subnet_ids:
if self.check_vm_exists_on_subnet(context,
port_host,
port_id,
subnet):
vm_exists_on_subnet = True
break
if vm_exists_on_subnet:
continue
filter_rtr = {'device_id': [router_id],
'device_owner':
[q_const.DEVICE_OWNER_DVR_INTERFACE]}
int_ports = self._core_plugin.get_ports(
context, filters=filter_rtr)
for prt in int_ports:
dvr_binding = (ml2_db.
get_dvr_port_binding_by_host(context.session,
prt['id'],
port_host))
if dvr_binding:
# unbind this port from router
dvr_binding['router_id'] = None
dvr_binding.update(dvr_binding)
self.delete_namespace_on_host(context, port_host, router_id)
info = {'router_id': router_id, 'host': port_host}
removed_router_info.append(info)
LOG.debug('Deleted router namespace %(router_id)s '
'on host %(host)s', info)
return removed_router_info
def bind_snat_router(self, context, router_id, chosen_agent):
"""Bind the router to the chosen l3 agent."""
with context.session.begin(subtransactions=True):
binding = CentralizedSnatL3AgentBinding()
binding.l3_agent = chosen_agent
binding.router_id = router_id
context.session.add(binding)
LOG.debug('SNAT Router %(router_id)s is scheduled to L3 agent '
'%(agent_id)s', {'router_id': router_id,
'agent_id': chosen_agent.id})
def bind_dvr_router_servicenode(self, context, router_id,
chosen_snat_agent):
"""Bind the IR router to service node if not already hosted."""
query = (context.session.query(l3agent_sch_db.RouterL3AgentBinding).
filter_by(router_id=router_id))
for bind in query:
if bind.l3_agent_id == chosen_snat_agent.id:
LOG.debug('Distributed Router %(router_id)s already hosted '
'on snat l3_agent %(snat_id)s',
{'router_id': router_id,
'snat_id': chosen_snat_agent.id})
return
with context.session.begin(subtransactions=True):
binding = l3agent_sch_db.RouterL3AgentBinding()
binding.l3_agent = chosen_snat_agent
binding.router_id = router_id
context.session.add(binding)
LOG.debug('Binding the distributed router %(router_id)s to '
'the snat agent %(snat_id)s',
{'router_id': router_id,
'snat_id': chosen_snat_agent.id})
def bind_snat_servicenode(self, context, router_id, snat_candidates):
"""Bind the snat router to the chosen l3 service agent."""
chosen_snat_agent = random.choice(snat_candidates)
self.bind_snat_router(context, router_id, chosen_snat_agent)
def unbind_snat_servicenode(self, context, router_id):
"""Unbind the snat router to the chosen l3 service agent."""
vm_ports = []
with context.session.begin(subtransactions=True):
query = (context.session.
query(CentralizedSnatL3AgentBinding).
filter_by(router_id=router_id))
try:
binding = query.one()
except exc.NoResultFound:
LOG.debug('no snat router binding found for %s', router_id)
return
host = binding.l3_agent.host
subnet_ids = self.get_subnet_ids_on_router(context, router_id)
for subnet in subnet_ids:
vm_ports = (
self.get_compute_ports_on_host_by_subnet(
context, host, subnet))
if vm_ports:
LOG.debug('VM exists on the snat enabled l3_agent '
'host %(host)s and router_id %(router_id)s',
{'host': host, 'router_id': router_id})
break
agent_id = binding.l3_agent_id
LOG.debug('Delete binding of the SNAT router %(router_id)s '
'from agent %(id)s', {'router_id': router_id,
'id': agent_id})
context.session.delete(binding)
if not vm_ports:
query = (context.session.
query(l3agent_sch_db.RouterL3AgentBinding).
filter_by(router_id=router_id,
l3_agent_id=agent_id).
delete(synchronize_session=False))
self.l3_rpc_notifier.router_removed_from_agent(
context, router_id, host)
LOG.debug('Removed binding for router %(router_id)s and '
'agent %(id)s', {'router_id': router_id, 'id': agent_id})
def schedule_snat_router(self, context, router_id, gw_exists):
"""Schedule the snat router on l3 service agent."""
if gw_exists:
query = (context.session.
query(CentralizedSnatL3AgentBinding).
filter_by(router_id=router_id))
for bind in query:
agt_id = bind.l3_agent_id
LOG.debug('SNAT Router %(router_id)s has already been '
'hosted by L3 agent '
'%(agent_id)s', {'router_id': router_id,
'agent_id': agt_id})
self.bind_dvr_router_servicenode(context,
router_id,
bind.l3_agent)
return
active_l3_agents = self.get_l3_agents(context, active=True)
if not active_l3_agents:
LOG.warn(_('No active L3 agents'))
return
sync_router = self.get_router(context, router_id)
snat_candidates = self.get_snat_candidates(sync_router,
active_l3_agents)
if snat_candidates:
self.bind_snat_servicenode(context, router_id, snat_candidates)
else:
self.unbind_snat_servicenode(context, router_id)

View File

@ -62,14 +62,11 @@ class L3_NAT_db_mixin(l3_db.L3_NAT_db_mixin):
# method is overriden in child classes
return router
def _build_routers_list(self, routers, gw_ports):
gw_port_id_gw_port_dict = {}
for gw_port in gw_ports:
gw_port_id_gw_port_dict[gw_port['id']] = gw_port
def _build_routers_list(self, context, routers, gw_ports):
for rtr in routers:
gw_port_id = rtr['gw_port_id']
if gw_port_id:
rtr['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
rtr['gw_port'] = gw_ports[gw_port_id]
# Add enable_snat key
rtr['enable_snat'] = rtr[EXTERNAL_GW_INFO]['enable_snat']
return routers

View File

@ -69,6 +69,16 @@ class L3RpcCallbackMixin(object):
for router in routers:
LOG.debug(_("Checking router: %(id)s for host: %(host)s"),
{'id': router['id'], 'host': host})
if router.get('gw_port') and router.get('distributed'):
self._ensure_host_set_on_port(context, plugin,
router.get('gw_port_host'),
router.get('gw_port'),
router['id'])
for p in router.get(constants.SNAT_ROUTER_INTF_KEY, []):
self._ensure_host_set_on_port(context, plugin,
router.get('gw_port_host'),
p, router['id'])
else:
self._ensure_host_set_on_port(context, plugin, host,
router.get('gw_port'),
router['id'])

View File

@ -0,0 +1,62 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""L3 scheduler additions to support DVR
Revision ID: 5589aa32bf80
Revises: 31d7f831a591
Create Date: 2014-07-7 11:00:43.392912
"""
# revision identifiers, used by Alembic.
revision = '5589aa32bf80'
down_revision = '31d7f831a591'
migration_for_plugins = [
'*'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
'csnat_l3_agent_bindings',
sa.Column('router_id', sa.String(length=36), nullable=False),
sa.Column('l3_agent_id', sa.String(length=36), nullable=False),
sa.Column('host_id', sa.String(length=255), nullable=True),
sa.Column('csnat_gw_port_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['csnat_gw_port_id'], ['ports.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('router_id')
)
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table('csnat_l3_agent_bindings')

View File

@ -1 +1 @@
31d7f831a591
5589aa32bf80

View File

@ -32,6 +32,7 @@ from neutron.db.firewall import firewall_db # noqa
from neutron.db import l3_agentschedulers_db # noqa
from neutron.db import l3_attrs_db # noqa
from neutron.db import l3_db # noqa
from neutron.db import l3_dvrscheduler_db # noqa
from neutron.db import l3_gwmode_db # noqa
from neutron.db.loadbalancer import loadbalancer_db # noqa
from neutron.db.metering import metering_db # noqa

View File

@ -177,6 +177,11 @@ class RouterNotHostedByL3Agent(exceptions.Conflict):
" by L3 agent %(agent_id)s.")
class RouterL3AgentMismatch(exceptions.Conflict):
message = _("Cannot host %(router_type)s router %(router_id)s "
"on %(agent_mode)s L3 agent %(agent_id)s.")
class L3AgentSchedulerPluginBase(object):
"""REST API to operate the l3 agent scheduler.

View File

@ -28,6 +28,7 @@ from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
@ -214,7 +215,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# TODO(rkukura): Implement filtering.
return nets
def _process_port_binding(self, mech_context, attrs):
def _process_port_binding(self, mech_context, context, attrs):
binding = mech_context._binding
port = mech_context.current
changes = False
@ -224,6 +225,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding.host != host):
binding.host = host
changes = True
if "compute:" in port['device_owner']:
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
if (attributes.is_attr_set(vnic_type) and
@ -796,7 +803,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result,
network, binding)
self._process_port_binding(mech_context, attrs)
self._process_port_binding(mech_context, context, attrs)
result[addr_pair.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs(
context, result,
@ -858,7 +866,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self, context, updated_port, network, binding,
original_port=original_port)
need_port_update_notify |= self._process_port_binding(
mech_context, attrs)
mech_context, context, attrs)
self.mechanism_manager.update_port_precommit(mech_context)
# TODO(apech) - handle errors raised by update_port, potentially
@ -959,8 +967,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def delete_port(self, context, id, l3_port_check=True):
LOG.debug(_("Deleting port %s"), id)
removed_routers = []
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
is_dvr_enabled = utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
if l3plugin and l3_port_check:
l3plugin.prevent_l3_port_deletion(context, id)
@ -990,11 +1001,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
else:
mech_context = driver_context.PortContext(self, context, port,
network, binding)
if "compute:" in port['device_owner'] and is_dvr_enabled:
router_info = l3plugin.dvr_deletens_if_no_vm(context, id)
removed_routers += router_info
self.mechanism_manager.delete_port_precommit(mech_context)
self._delete_port_security_group_bindings(context, id)
if l3plugin:
router_ids = l3plugin.disassociate_floatingips(
context, id, do_notify=False)
if is_dvr_enabled:
l3plugin.dvr_vmarp_table_update(context, id, "del")
LOG.debug("Calling delete_port for %(port_id)s owned by %(owner)s"
% {"port_id": id, "owner": port['device_owner']})
@ -1003,6 +1019,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# now that we've left db transaction, we are safe to notify
if l3plugin:
l3plugin.notify_routers_updated(context, router_ids)
for router in removed_routers:
l3plugin.remove_router_from_l3_agent(
context, router['host'], router['router_id'])
try:
# for both normal and DVR Interface ports, only one invocation of

View File

@ -271,11 +271,11 @@ class L3AgentSchedulerDbMixin(l3_agentschedulers_db.L3AgentSchedulerDbMixin):
return super(L3AgentSchedulerDbMixin, self).auto_schedule_routers(
context, host, router_ids)
def schedule_router(self, context, router):
def schedule_router(self, context, router, candidates=None, hints=None):
if (self._get_provider_by_router_id(context, router) ==
nconst.ROUTER_PROVIDER_L3AGENT):
return super(L3AgentSchedulerDbMixin, self).schedule_router(
context, router)
context, router, candidates=candidates, hints=hints)
def add_router_to_l3_agent(self, context, id, router_id):
provider = self._get_provider_by_router_id(context, router_id)

View File

@ -35,13 +35,23 @@ LOG = logging.getLogger(__name__)
class L3Scheduler(object):
@abc.abstractmethod
def schedule(self, plugin, context, router_id, candidates=None):
def schedule(self, plugin, context, router_id,
candidates=None, hints=None):
"""Schedule the router to an active L3 agent.
Schedule the router only if it is not already scheduled.
"""
pass
def dvr_has_binding(self, context, router_id, l3_agent_id):
router_binding_model = l3_agentschedulers_db.RouterL3AgentBinding
query = context.session.query(router_binding_model)
query = query.filter(router_binding_model.router_id == router_id,
router_binding_model.l3_agent_id == l3_agent_id)
return query.count() > 0
def auto_schedule_routers(self, plugin, context, host, router_ids):
"""Schedule non-hosted routers to L3 Agent running on host.
@ -69,18 +79,20 @@ class L3Scheduler(object):
LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
# check if each of the specified routers is hosted
if router_ids:
unscheduled_router_ids = []
for router_id in router_ids:
routers = plugin.get_routers(
context, filters={'id': router_ids})
unscheduled_routers = []
for router in routers:
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [router_id], admin_state_up=True)
if l3_agents:
context, [router['id']], admin_state_up=True)
if l3_agents and not router.get('distributed', False):
LOG.debug(_('Router %(router_id)s has already been'
' hosted by L3 agent %(agent_id)s'),
{'router_id': router_id,
{'router_id': router['id'],
'agent_id': l3_agents[0]['id']})
else:
unscheduled_router_ids.append(router_id)
if not unscheduled_router_ids:
unscheduled_routers.append(router)
if not unscheduled_routers:
# all (specified) routers are already scheduled
return False
else:
@ -95,27 +107,36 @@ class L3Scheduler(object):
if not unscheduled_router_ids:
LOG.debug(_('No non-hosted routers'))
return False
unscheduled_routers = plugin.get_routers(
context, filters={'id': unscheduled_router_ids})
# check if the configuration of l3 agent is compatible
# with the router
routers = plugin.get_routers(
context, filters={'id': unscheduled_router_ids})
to_removed_ids = []
for router in routers:
candidates = plugin.get_l3_agent_candidates(router, [l3_agent])
to_removed_ids = set()
for router in unscheduled_routers:
candidates = plugin.get_l3_agent_candidates(context,
router,
[l3_agent])
if not candidates:
to_removed_ids.append(router['id'])
router_ids = set([r['id'] for r in routers]) - set(to_removed_ids)
if not router_ids:
to_removed_ids.add(router['id'])
target_routers = [r for r in unscheduled_routers
if r['id'] not in to_removed_ids]
if not target_routers:
LOG.warn(_('No routers compatible with L3 agent configuration'
' on host %s'), host)
return False
for router_id in router_ids:
self.bind_router(context, router_id, l3_agent)
for router_dict in target_routers:
if (router_dict.get('distributed', False)
and self.dvr_has_binding(context,
router_dict['id'],
l3_agent.id)):
continue
self.bind_router(context, router_dict['id'], l3_agent)
return True
def get_candidates(self, plugin, context, sync_router):
def get_candidates(self, plugin, context, sync_router, subnet_id):
"""Return L3 agents where a router could be scheduled."""
with context.session.begin(subtransactions=True):
# allow one router is hosted by just
@ -124,7 +145,7 @@ class L3Scheduler(object):
# active any time
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [sync_router['id']], admin_state_up=True)
if l3_agents:
if l3_agents and not sync_router.get('distributed', False):
LOG.debug(_('Router %(router_id)s has already been hosted'
' by L3 agent %(agent_id)s'),
{'router_id': sync_router['id'],
@ -135,8 +156,16 @@ class L3Scheduler(object):
if not active_l3_agents:
LOG.warn(_('No active L3 agents'))
return
candidates = plugin.get_l3_agent_candidates(sync_router,
active_l3_agents)
new_l3agents = plugin.get_l3_agent_candidates(context,
sync_router,
active_l3_agents,
subnet_id)
old_l3agentset = set(l3_agents)
if sync_router.get('distributed', False):
new_l3agentset = set(new_l3agents)
candidates = list(new_l3agentset - old_l3agentset)
else:
candidates = new_l3agents
if not candidates:
LOG.warn(_('No L3 agents can host the router %s'),
sync_router['id'])
@ -163,38 +192,56 @@ class L3Scheduler(object):
'%(agent_id)s', {'router_id': router_id,
'agent_id': chosen_agent.id})
def _schedule_router(self, plugin, context, router_id,
candidates=None, hints=None):
sync_router = plugin.get_router(context, router_id)
subnet_id = hints.get('subnet_id') if hints else None
candidates = candidates or self.get_candidates(
plugin, context, sync_router, subnet_id)
if (hints and 'gw_exists' in hints
and sync_router.get('distributed', False)):
plugin.schedule_snat_router(context, router_id, sync_router)
if not candidates:
return
if sync_router.get('distributed', False):
for chosen_agent in candidates:
self.bind_router(context, router_id, chosen_agent)
else:
chosen_agent = self._choose_router_agent(
plugin, context, candidates)
self.bind_router(context, router_id, chosen_agent)
return chosen_agent
@abc.abstractmethod
def _choose_router_agent(self, plugin, context, candidates):
"""Choose an agent from candidates based on a specific policy."""
pass
class ChanceScheduler(L3Scheduler):
"""Randomly allocate an L3 agent for a router."""
def schedule(self, plugin, context, router_id, candidates=None):
def schedule(self, plugin, context, router_id,
candidates=None, hints=None):
with context.session.begin(subtransactions=True):
sync_router = plugin.get_router(context, router_id)
candidates = candidates or self.get_candidates(
plugin, context, sync_router)
if not candidates:
return
return self._schedule_router(
plugin, context, router_id, candidates=candidates, hints=hints)
chosen_agent = random.choice(candidates)
self.bind_router(context, router_id, chosen_agent)
return chosen_agent
def _choose_router_agent(self, plugin, context, candidates):
return random.choice(candidates)
class LeastRoutersScheduler(L3Scheduler):
"""Allocate to an L3 agent with the least number of routers bound."""
def schedule(self, plugin, context, router_id, candidates=None):
def schedule(self, plugin, context, router_id,
candidates=None, hints=None):
with context.session.begin(subtransactions=True):
sync_router = plugin.get_router(context, router_id)
candidates = candidates or self.get_candidates(
plugin, context, sync_router)
if not candidates:
return
return self._schedule_router(
plugin, context, router_id, candidates=candidates, hints=hints)
def _choose_router_agent(self, plugin, context, candidates):
candidate_ids = [candidate['id'] for candidate in candidates]
chosen_agent = plugin.get_l3_agent_with_min_routers(
context, candidate_ids)
self.bind_router(context, router_id, chosen_agent)
return chosen_agent

View File

@ -24,8 +24,8 @@ from neutron.common import topics
from neutron.db import api as qdbapi
from neutron.db import common_db_mixin
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_dvr_db
from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import model_base
@ -45,7 +45,7 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin):
l3_dvrscheduler_db.L3_DVRsch_db_mixin):
"""Implementation of the Neutron L3 Router Service Plugin.

View File

@ -200,6 +200,10 @@ class TestL3GwModeMixin(base.BaseTestCase):
self.fip_request = {'port_id': FAKE_FIP_INT_PORT_ID,
'tenant_id': self.tenant_id}
def _get_gwports_dict(self, gw_ports):
return dict((gw_port['id'], gw_port)
for gw_port in gw_ports)
def _reset_ext_gw(self):
# Reset external gateway
self.router.gw_port_id = None
@ -253,7 +257,9 @@ class TestL3GwModeMixin(base.BaseTestCase):
def test_build_routers_list_no_ext_gw(self):
self._reset_ext_gw()
router_dict = self.target_object._make_router_dict(self.router)
routers = self.target_object._build_routers_list([router_dict], [])
routers = self.target_object._build_routers_list(self.context,
[router_dict],
[])
self.assertEqual(1, len(routers))
router = routers[0]
self.assertIsNone(router.get('gw_port'))
@ -262,7 +268,8 @@ class TestL3GwModeMixin(base.BaseTestCase):
def test_build_routers_list_with_ext_gw(self):
router_dict = self.target_object._make_router_dict(self.router)
routers = self.target_object._build_routers_list(
[router_dict], [self.router.gw_port])
self.context, [router_dict],
self._get_gwports_dict([self.router.gw_port]))
self.assertEqual(1, len(routers))
router = routers[0]
self.assertIsNotNone(router.get('gw_port'))
@ -273,7 +280,8 @@ class TestL3GwModeMixin(base.BaseTestCase):
self.router.enable_snat = False
router_dict = self.target_object._make_router_dict(self.router)
routers = self.target_object._build_routers_list(
[router_dict], [self.router.gw_port])
self.context, [router_dict],
self._get_gwports_dict([self.router.gw_port]))
self.assertEqual(1, len(routers))
router = routers[0]
self.assertIsNotNone(router.get('gw_port'))

View File

@ -28,10 +28,13 @@ from neutron.common import topics
from neutron import context as q_context
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db
from neutron.db import l3_dvrscheduler_db
from neutron.extensions import l3 as ext_l3
from neutron import manager
from neutron.openstack.common import timeutils
from neutron.scheduler import l3_agent_scheduler
from neutron.tests import base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
@ -239,3 +242,142 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCase):
agent_id3 = agents[0]['id']
self.assertNotEqual(agent_id1, agent_id3)
class L3DvrScheduler(l3_db.L3_NAT_db_mixin,
l3_dvrscheduler_db.L3_DVRsch_db_mixin):
pass
class L3DvrSchedulerTestCase(base.BaseTestCase):
def setUp(self):
plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.setup_coreplugin(plugin)
super(L3DvrSchedulerTestCase, self).setUp()
self.adminContext = q_context.get_admin_context()
self.dut = L3DvrScheduler()
def test_dvr_update_router_addvm(self):
port = {
'device_id': 'abcd',
'device_owner': 'compute:nova',
'fixed_ips': [
{
'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
'ip_address': '10.10.10.3'
}
]
}
dvr_port = {
'id': 'dvr_port1',
'device_id': 'r1',
'device_owner': 'network:router_interface_distributed',
'fixed_ips': [
{
'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
'ip_address': '10.10.10.1'
}
]
}
r1 = {
'id': 'r1',
'distributed': True,
}
with contextlib.nested(
mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
'.get_ports', return_value=[dvr_port]),
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=mock.Mock()),
mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.get_router',
return_value=r1),
mock.patch('neutron.api.rpc.agentnotifiers.l3_rpc_agent_api'
'.L3AgentNotifyAPI')):
self.dut.dvr_update_router_addvm(self.adminContext, port)
def test_get_dvr_routers_by_vmportid(self):
dvr_port = {
'id': 'dvr_port1',
'device_id': 'r1',
'device_owner': 'network:router_interface_distributed',
'fixed_ips': [
{
'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
'ip_address': '10.10.10.1'
}
]
}
r1 = {
'id': 'r1',
'distributed': True,
}
with contextlib.nested(
mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
'.get_port', return_value=dvr_port),
mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
'.get_ports', return_value=[dvr_port])):
router_id = self.dut.get_dvr_routers_by_vmportid(self.adminContext,
dvr_port['id'])
self.assertEqual(router_id.pop(), r1['id'])
def test_get_subnet_ids_on_router(self):
dvr_port = {
'id': 'dvr_port1',
'device_id': 'r1',
'device_owner': 'network:router_interface_distributed',
'fixed_ips': [
{
'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
'ip_address': '10.10.10.1'
}
]
}
r1 = {
'id': 'r1',
'distributed': True,
}
with contextlib.nested(
mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
'.get_ports', return_value=[dvr_port])):
sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
r1['id'])
self.assertEqual(sub_ids.pop(),
dvr_port.get('fixed_ips').pop(0).get('subnet_id'))
def test_check_vm_exists_on_subnet(self):
dvr_port = {
'id': 'dvr_port1',
'device_id': 'r1',
'status': 'ACTIVE',
'binding:host_id': 'thisHost',
'device_owner': 'compute:nova',
'fixed_ips': [
{
'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
'ip_address': '10.10.10.1'
}
]
}
r1 = {
'id': 'r1',
'distributed': True,
}
with contextlib.nested(
mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
'.get_ports', return_value=[dvr_port]),
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=mock.Mock()),
mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.get_router',
return_value=r1),
mock.patch('neutron.api.rpc.agentnotifiers.l3_rpc_agent_api'
'.L3AgentNotifyAPI')):
sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
r1['id'])
result = self.dut.check_vm_exists_on_subnet(
self.adminContext,
'thisHost', 'dvr_port1',
sub_ids)
self.assertFalse(result)