Add scheduling feature basing on agent management extension

3rd part of blueprint quantum-scheduler

1. Allow networks to be hosted by certain dhcp agents.
Network to dhcp agent is a
many to many relationship. Provide a simple
scheduler to schedule a network randomly
to an active dhcp agent when a network or port is created.
2. Allow admin user to (de)schedule network to a
certain dhcp agent manually.
3. Allow routers to be hosted by a certain l3 agent.
Router to l3 agent is a many to one relationship.
Provide a simple scheduler to
schedule a router to l3 agent if the router is not
scheduled when the router is  updated.
4. Auto schedule networks and routers to agents when agents
start.
5. Only support ovs plugin at this point

Change-Id: Iddec3ea9d4c0fe2d51a59f7db47145722fc5a1cd
This commit is contained in:
gongysh 2013-02-22 23:34:57 +08:00
parent 598483f0fb
commit 0070b452f1
29 changed files with 2230 additions and 157 deletions

View File

@ -58,5 +58,14 @@
"update_agent": "rule:admin_only", "update_agent": "rule:admin_only",
"delete_agent": "rule:admin_only", "delete_agent": "rule:admin_only",
"get_agent": "rule:admin_only", "get_agent": "rule:admin_only",
"get_agents": "rule:admin_only" "get_agents": "rule:admin_only",
"create_dhcp-network": "rule:admin_only",
"delete_dhcp-network": "rule:admin_only",
"get_dhcp-networks": "rule:admin_only",
"create_l3-router": "rule:admin_only",
"delete_l3-router": "rule:admin_only",
"get_l3-routers": "rule:admin_only",
"get_dhcp-agents": "rule:admin_only",
"get_l3-agents": "rule:admin_only"
} }

View File

@ -198,6 +198,27 @@ notification_topics = notifications
# Maximum number of fixed ips per port # Maximum number of fixed ips per port
# max_fixed_ips_per_port = 5 # max_fixed_ips_per_port = 5
# =========== items for agent management extension =============
# Seconds to regard the agent as down.
# agent_down_time = 5
# =========== end of items for agent management extension =====
# =========== items for agent scheduler extension =============
# Driver to use for scheduling network to DHCP agent
# network_scheduler_driver = quantum.scheduler.dhcp_agent_scheduler.ChanceScheduler
# Driver to use for scheduling router to a default L3 agent
# router_scheduler_driver = quantum.scheduler.l3_agent_scheduler.ChanceScheduler
# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
# networks to first DHCP agent which sends get_active_networks message to
# quantum server
# network_auto_schedule = True
# Allow auto scheduling routers to L3 agent. It will schedule non-hosted
# routers to first L3 agent which sends sync_routers message to quantum server
# router_auto_schedule = True
# =========== end of items for agent scheduler extension =====
[QUOTAS] [QUOTAS]
# resource name(s) that are supported in quota features # resource name(s) that are supported in quota features
# quota_items = network,subnet,port # quota_items = network,subnet,port
@ -217,11 +238,6 @@ notification_topics = notifications
# default driver to use for quota checks # default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver # quota_driver = quantum.quota.ConfDriver
# =========== items for agent management extension =============
# Seconds to regard the agent as down.
# agent_down_time = 5
# =========== end of items for agent management extension =====
[DEFAULT_SERVICETYPE] [DEFAULT_SERVICETYPE]
# Description of the default service type (optional) # Description of the default service type (optional)
# description = "default service type" # description = "default service type"

View File

@ -320,7 +320,7 @@ class DhcpPluginApi(proxy.RpcProxy):
super(DhcpPluginApi, self).__init__( super(DhcpPluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION) topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context self.context = context
self.host = socket.gethostname() self.host = cfg.CONF.host
def get_active_networks(self): def get_active_networks(self):
"""Make a remote process call to retrieve the active networks.""" """Make a remote process call to retrieve the active networks."""
@ -685,6 +685,11 @@ class DhcpAgentWithStateReport(DhcpAgent):
if self.agent_state.pop('start_flag', None): if self.agent_state.pop('start_flag', None):
self.run() self.run()
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.needs_resync = True
LOG.info(_("agent_updated by server side %s!"), payload)
def after_start(self): def after_start(self):
LOG.info(_("DHCP agent started")) LOG.info(_("DHCP agent started"))

View File

@ -93,7 +93,7 @@ class L3PluginApi(proxy.RpcProxy):
class RouterInfo(object): class RouterInfo(object):
def __init__(self, router_id, root_helper, use_namespaces, router=None): def __init__(self, router_id, root_helper, use_namespaces, router):
self.router_id = router_id self.router_id = router_id
self.ex_gw_port = None self.ex_gw_port = None
self.internal_ports = [] self.internal_ports = []
@ -227,7 +227,7 @@ class L3NATAgent(manager.Manager):
else: else:
raise raise
def _router_added(self, router_id, router=None): def _router_added(self, router_id, router):
ri = RouterInfo(router_id, self.root_helper, ri = RouterInfo(router_id, self.root_helper,
self.conf.use_namespaces, router) self.conf.use_namespaces, router)
self.router_info[router_id] = ri self.router_info[router_id] = ri
@ -242,6 +242,10 @@ class L3NATAgent(manager.Manager):
def _router_removed(self, router_id): def _router_removed(self, router_id):
ri = self.router_info[router_id] ri = self.router_info[router_id]
ri.router['gw_port'] = None
ri.router[l3_constants.INTERFACE_KEY] = []
ri.router[l3_constants.FLOATINGIP_KEY] = []
self.process_router(ri)
for c, r in self.metadata_filter_rules(): for c, r in self.metadata_filter_rules():
ri.iptables_manager.ipv4['filter'].remove_rule(c, r) ri.iptables_manager.ipv4['filter'].remove_rule(c, r)
for c, r in self.metadata_nat_rules(): for c, r in self.metadata_nat_rules():
@ -568,7 +572,13 @@ class L3NATAgent(manager.Manager):
LOG.debug(msg) LOG.debug(msg)
self.fullsync = True self.fullsync = True
def _process_routers(self, routers): def router_removed_from_agent(self, context, payload):
self.router_deleted(context, payload['router_id'])
def router_added_to_agent(self, context, payload):
self.routers_updated(context, payload)
def _process_routers(self, routers, all_routers=False):
if (self.conf.external_network_bridge and if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)): not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("The external network bridge '%s' does not exist"), LOG.error(_("The external network bridge '%s' does not exist"),
@ -576,7 +586,17 @@ class L3NATAgent(manager.Manager):
return return
target_ex_net_id = self._fetch_external_net_id() target_ex_net_id = self._fetch_external_net_id()
# if routers are all the routers we have (They are from router sync on
# starting or when error occurs during running), we seek the
# routers which should be removed.
# If routers are from server side notification, we seek them
# from subset of incoming routers and ones we have now.
if all_routers:
prev_router_ids = set(self.router_info)
else:
prev_router_ids = set(self.router_info) & set(
[router['id'] for router in routers])
cur_router_ids = set()
for r in routers: for r in routers:
if not r['admin_state_up']: if not r['admin_state_up']:
continue continue
@ -593,13 +613,15 @@ class L3NATAgent(manager.Manager):
if ex_net_id and ex_net_id != target_ex_net_id: if ex_net_id and ex_net_id != target_ex_net_id:
continue continue
cur_router_ids.add(r['id'])
if r['id'] not in self.router_info: if r['id'] not in self.router_info:
self._router_added(r['id']) self._router_added(r['id'], r)
ri = self.router_info[r['id']] ri = self.router_info[r['id']]
ri.router = r ri.router = r
self.process_router(ri) self.process_router(ri)
# identify and remove routers that no longer exist
for router_id in prev_router_ids - cur_router_ids:
self._router_removed(router_id)
@periodic_task.periodic_task @periodic_task.periodic_task
def _sync_routers_task(self, context): def _sync_routers_task(self, context):
@ -613,8 +635,7 @@ class L3NATAgent(manager.Manager):
router_id = None router_id = None
routers = self.plugin_rpc.get_routers( routers = self.plugin_rpc.get_routers(
context, router_id) context, router_id)
self.router_info = {} self._process_routers(routers, all_routers=True)
self._process_routers(routers)
self.fullsync = False self.fullsync = False
except Exception: except Exception:
LOG.exception(_("Failed synchronizing routers")) LOG.exception(_("Failed synchronizing routers"))
@ -704,6 +725,11 @@ class L3NATAgentWithStateReport(L3NATAgent):
except Exception: except Exception:
LOG.exception(_("Failed reporting state!")) LOG.exception(_("Failed reporting state!"))
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.fullsync = True
LOG.info(_("agent_updated by server side %s!"), payload)
def main(): def main():
eventlet.monkey_patch() eventlet.monkey_patch()

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from quantum.common import constants
from quantum.common import topics from quantum.common import topics
from quantum.common import utils
from quantum import manager
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy from quantum.openstack.common.rpc import proxy
@ -40,11 +43,35 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
super(DhcpAgentNotifyAPI, self).__init__( super(DhcpAgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION) topic=topic, default_version=self.BASE_RPC_API_VERSION)
def _notification(self, context, method, payload): def _get_dhcp_agents(self, context, network_id):
plugin = manager.QuantumManager.get_plugin()
dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
context, [network_id], active=True)
return [(dhcp_agent.host, dhcp_agent.topic) for
dhcp_agent in dhcp_agents]
def _notification_host(self, context, method, payload, host):
"""Notify the agent on host"""
self.cast(
context, self.make_msg(method,
payload=payload),
topic='%s.%s' % (topics.DHCP_AGENT, host))
def _notification(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network""" """Notify all the agents that are hosting the network"""
# By now, we have no scheduling feature, so we fanout plugin = manager.QuantumManager.get_plugin()
# to all of the DHCP agents if (method != 'network_delete_end' and utils.is_extension_supported(
self._notification_fanout(context, method, payload) plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)):
for (host, topic) in self._get_dhcp_agents(context, network_id):
self.cast(
context, self.make_msg(method,
payload=payload),
topic='%s.%s' % (topic, host))
else:
# besides the non-agentscheduler plugin,
# There is no way to query who is hosting the network
# when the network is deleted, so we need to fanout
self._notification_fanout(context, method, payload)
def _notification_fanout(self, context, method, payload): def _notification_fanout(self, context, method, payload):
"""Fanout the payload to all dhcp agents""" """Fanout the payload to all dhcp agents"""
@ -53,6 +80,19 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
payload=payload), payload=payload),
topic=topics.DHCP_AGENT) topic=topics.DHCP_AGENT)
def network_removed_from_agent(self, context, network_id, host):
self._notification_host(context, 'network_delete_end',
{'network_id': network_id}, host)
def network_added_to_agent(self, context, network_id, host):
self._notification_host(context, 'network_create_end',
{'network': {'id': network_id}}, host)
def agent_updated(self, context, admin_state_up, host):
self._notification_host(context, 'agent_updated',
{'admin_state_up': admin_state_up},
host)
def notify(self, context, data, methodname): def notify(self, context, data, methodname):
# data is {'key' : 'value'} with only one key # data is {'key' : 'value'} with only one key
if methodname not in self.VALID_METHOD_NAMES: if methodname not in self.VALID_METHOD_NAMES:
@ -61,10 +101,18 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
if obj_type not in self.VALID_RESOURCES: if obj_type not in self.VALID_RESOURCES:
return return
obj_value = data[obj_type] obj_value = data[obj_type]
network_id = None
if obj_type == 'network' and 'id' in obj_value:
network_id = obj_value['id']
elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
network_id = obj_value['network_id']
if not network_id:
return
methodname = methodname.replace(".", "_") methodname = methodname.replace(".", "_")
if methodname.endswith("_delete_end"): if methodname.endswith("_delete_end"):
if 'id' in obj_value: if 'id' in obj_value:
self._notification(context, methodname, self._notification(context, methodname,
{obj_type + '_id': obj_value['id']}) {obj_type + '_id': obj_value['id']},
network_id)
else: else:
self._notification(context, methodname, data) self._notification(context, methodname, data, network_id)

View File

@ -0,0 +1,120 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum.common import constants
from quantum.common import topics
from quantum.common import utils
from quantum import manager
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class L3AgentNotifyAPI(proxy.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.L3_AGENT):
super(L3AgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def _notification_host(self, context, method, payload, host):
"""Notify the agent that is hosting the router"""
LOG.debug(_('Nofity agent at %(host)s the message '
'%(method)s'), {'host': host,
'method': method})
self.cast(
context, self.make_msg(method,
payload=payload),
topic='%s.%s' % (topics.L3_AGENT, host))
def _agent_notification(self, context, method, routers,
operation, data):
"""Notify changed routers to hosting l3 agents.
Adjust routers according to l3 agents' role and
related dhcp agents.
Notify dhcp agent to get right subnet's gateway ips.
"""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.QuantumManager.get_plugin()
for router in routers:
l3_agents = plugin.get_l3_agents_hosting_routers(
adminContext, [router['id']],
admin_state_up=True,
active=True)
for l3_agent in l3_agents:
LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
'%(method)s'),
{'topic': l3_agent.topic,
'host': l3_agent.host,
'method': method})
self.cast(
context, self.make_msg(method,
routers=[router]),
topic='%s.%s' % (l3_agent.topic, l3_agent.host))
def _notification(self, context, method, routers, operation, data):
"""Notify all the agents that are hosting the routers"""
plugin = manager.QuantumManager.get_plugin()
if utils.is_extension_supported(
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
adminContext = (context.is_admin and
context or context.elevated())
plugin.schedule_routers(adminContext, routers)
self._agent_notification(
context, method, routers, operation, data)
else:
self.fanout_cast(
context, self.make_msg(method,
routers=routers),
topic=topics.L3_AGENT)
def _notification_fanout(self, context, method, router_id):
"""Fanout the deleted router to all L3 agents"""
LOG.debug(_('Fanout notify agent at %(topic)s the message '
'%(method)s on router %(router_id)s'),
{'topic': topics.DHCP_AGENT,
'method': method,
'router_id': router_id})
self.fanout_cast(
context, self.make_msg(method,
router_id=router_id),
topic=topics.L3_AGENT)
def agent_updated(self, context, admin_state_up, host):
self._notification_host(context, 'agent_updated',
{'admin_state_up': admin_state_up},
host)
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)
def routers_updated(self, context, routers, operation=None, data=None):
if routers:
self._notification(context, 'routers_updated', routers,
operation, data)
def router_removed_from_agent(self, context, router_id, host):
self._notification_host(context, 'router_removed_from_agent',
{'router_id': router_id}, host)
def router_added_to_agent(self, context, routers, host):
self._notification_host(context, 'router_added_to_agent',
routers, host)
L3AgentNotify = L3AgentNotifyAPI()

View File

@ -66,3 +66,5 @@ PAGINATION_INFINITE = 'infinite'
SORT_DIRECTION_ASC = 'asc' SORT_DIRECTION_ASC = 'asc'
SORT_DIRECTION_DESC = 'desc' SORT_DIRECTION_DESC = 'desc'
AGENT_SCHEDULER_EXT_ALIAS = 'agent_scheduler'

View File

@ -183,3 +183,8 @@ def diff_list_of_dict(old_list, new_list):
added = new_set - old_set added = new_set - old_set
removed = old_set - new_set removed = old_set - new_set
return [str2dict(a) for a in added], [str2dict(r) for r in removed] return [str2dict(a) for a in added], [str2dict(r) for r in removed]
def is_extension_supported(plugin, ext_alias):
return ext_alias in getattr(
plugin, "supported_extension_aliases", [])

View File

@ -67,24 +67,30 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
raise ext_agent.AgentNotFound(id=id) raise ext_agent.AgentNotFound(id=id)
return agent return agent
def _is_agent_down(self, heart_beat_time_str): @classmethod
return timeutils.is_older_than(heart_beat_time_str, def is_agent_down(cls, heart_beat_time):
return timeutils.is_older_than(heart_beat_time,
cfg.CONF.agent_down_time) cfg.CONF.agent_down_time)
def get_configuration_dict(self, agent_db):
try:
conf = jsonutils.loads(agent_db.configurations)
except Exception:
msg = _('Configuration for agent %(agent_type)s on host %(host)s'
' is invalid.')
LOG.warn(msg, {'agent_type': agent_db.agent_type,
'host': agent_db.host})
conf = {}
return conf
def _make_agent_dict(self, agent, fields=None): def _make_agent_dict(self, agent, fields=None):
attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get( attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
ext_agent.RESOURCE_NAME + 's') ext_agent.RESOURCE_NAME + 's')
res = dict((k, agent[k]) for k in attr res = dict((k, agent[k]) for k in attr
if k not in ['alive', 'configurations']) if k not in ['alive', 'configurations'])
res['alive'] = not self._is_agent_down(res['heartbeat_timestamp']) res['alive'] = not AgentDbMixin.is_agent_down(
try: res['heartbeat_timestamp'])
res['configurations'] = jsonutils.loads(agent['configurations']) res['configurations'] = self.get_configuration_dict(agent)
except Exception:
msg = _('Configurations for agent %(agent_type)s on host %(host)s'
' are invalid.')
LOG.warn(msg, {'agent_type': res['agent_type'],
'host': res['host']})
res['configurations'] = {}
return self._fields(res, fields) return self._fields(res, fields)
def delete_agent(self, context, id): def delete_agent(self, context, id):
@ -99,6 +105,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
agent.update(agent_data) agent.update(agent_data)
return self._make_agent_dict(agent) return self._make_agent_dict(agent)
def get_agents_db(self, context, filters=None):
query = self._get_collection_query(context, Agent, filters=filters)
return query.all()
def get_agents(self, context, filters=None, fields=None): def get_agents(self, context, filters=None, fields=None):
return self._get_collection(context, Agent, return self._get_collection(context, Agent,
self._make_agent_dict, self._make_agent_dict,

View File

@ -0,0 +1,363 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from sqlalchemy.orm import joinedload
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.db import agents_db
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import agentscheduler
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
class NetworkDhcpAgentBinding(model_base.BASEV2):
"""Represents binding between quantum networks and DHCP agents"""
network_id = sa.Column(sa.String(36),
sa.ForeignKey("networks.id", ondelete='CASCADE'),
primary_key=True)
dhcp_agent = orm.relation(agents_db.Agent)
dhcp_agent_id = sa.Column(sa.String(36),
sa.ForeignKey("agents.id",
ondelete='CASCADE'),
primary_key=True)
class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
"""Represents binding between quantum routers and L3 agents"""
router_id = sa.Column(sa.String(36),
sa.ForeignKey("routers.id", ondelete='CASCADE'))
l3_agent = orm.relation(agents_db.Agent)
l3_agent_id = sa.Column(sa.String(36),
sa.ForeignKey("agents.id",
ondelete='CASCADE'))
class AgentSchedulerDbMixin(agentscheduler.AgentSchedulerPluginBase):
"""Mixin class to add agent scheduler extension to db_plugin_base_v2."""
dhcp_agent_notifier = None
l3_agent_notifier = None
network_scheduler = None
router_scheduler = None
def get_dhcp_agents_hosting_networks(
self, context, network_ids, active=None):
if not network_ids:
return []
query = context.session.query(NetworkDhcpAgentBinding)
query = query.options(joinedload('dhcp_agent'))
if len(network_ids) == 1:
query = query.filter(
NetworkDhcpAgentBinding.network_id == network_ids[0])
elif network_ids:
query = query.filter(
NetworkDhcpAgentBinding.network_id in network_ids)
if active is not None:
query = (query.filter(agents_db.Agent.admin_state_up == active))
dhcp_agents = [binding.dhcp_agent for binding in query.all()]
if active is not None:
dhcp_agents = [dhcp_agent for dhcp_agent in
dhcp_agents if not
agents_db.AgentDbMixin.is_agent_down(
dhcp_agent['heartbeat_timestamp'])]
return dhcp_agents
def add_network_to_dhcp_agent(self, context, id, network_id):
self._get_network(context, network_id)
with context.session.begin(subtransactions=True):
agent_db = self._get_agent(context, id)
if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or
not agent_db['admin_state_up']):
raise agentscheduler.InvalidDHCPAgent(id=id)
dhcp_agents = self.get_dhcp_agents_hosting_networks(
context, [network_id])
for dhcp_agent in dhcp_agents:
if id == dhcp_agent.id:
raise agentscheduler.NetworkHostedByDHCPAgent(
network_id=network_id, agent_id=id)
binding = NetworkDhcpAgentBinding()
binding.dhcp_agent_id = id
binding.network_id = network_id
context.session.add(binding)
if self.dhcp_agent_notifier:
self.dhcp_agent_notifier.network_added_to_agent(
context, network_id, agent_db.host)
def remove_network_from_dhcp_agent(self, context, id, network_id):
agent = self._get_agent(context, id)
with context.session.begin(subtransactions=True):
try:
query = context.session.query(NetworkDhcpAgentBinding)
binding = query.filter(
NetworkDhcpAgentBinding.network_id == network_id,
NetworkDhcpAgentBinding.dhcp_agent_id == id).one()
except exc.NoResultFound:
raise agentscheduler.NetworkNotHostedByDhcpAgent(
network_id=network_id, agent_id=id)
context.session.delete(binding)
if self.dhcp_agent_notifier:
self.dhcp_agent_notifier.network_removed_from_agent(
context, network_id, agent.host)
def list_networks_on_dhcp_agent(self, context, id):
query = context.session.query(NetworkDhcpAgentBinding.network_id)
net_ids = query.filter(
NetworkDhcpAgentBinding.dhcp_agent_id == id).all()
if net_ids:
_ids = [item[0] for item in net_ids]
return {'networks':
self.get_networks(context, filters={'id': _ids})}
else:
return {'networks': []}
def list_active_networks_on_active_dhcp_agent(self, context, host):
agent = self._get_agent_by_type_and_host(
context, constants.AGENT_TYPE_DHCP, host)
if not agent.admin_state_up:
return []
query = context.session.query(NetworkDhcpAgentBinding.network_id)
net_ids = query.filter(
NetworkDhcpAgentBinding.dhcp_agent_id == agent.id).all()
if net_ids:
_ids = [item[0] for item in net_ids]
return self.get_networks(
context, filters={'id': _ids, 'admin_state_up': [True]})
else:
return []
def list_dhcp_agents_hosting_network(self, context, network_id):
dhcp_agents = self.get_dhcp_agents_hosting_networks(
context, [network_id])
agent_ids = [dhcp_agent.id for dhcp_agent in dhcp_agents]
if agent_ids:
return {
'agents':
self.get_agents(context, filters={'id': agent_ids})}
else:
return {'agents': []}
def add_router_to_l3_agent(self, context, id, router_id):
"""Add a l3 agent to host a router.
"""
router = self.get_router(context, router_id)
with context.session.begin(subtransactions=True):
agent_db = self._get_agent(context, id)
if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
not agent_db['admin_state_up'] or
not self.get_l3_agent_candidates(router, [agent_db])):
raise agentscheduler.InvalidL3Agent(id=id)
query = context.session.query(RouterL3AgentBinding)
try:
binding = query.filter(
RouterL3AgentBinding.l3_agent_id == agent_db.id,
RouterL3AgentBinding.router_id == router_id).one()
if binding:
raise agentscheduler.RouterHostedByL3Agent(
router_id=router_id, agent_id=id)
except exc.NoResultFound:
pass
result = self.auto_schedule_routers(context,
agent_db.host,
router_id)
if not result:
raise agentscheduler.RouterSchedulingFailed(
router_id=router_id, agent_id=id)
if self.l3_agent_notifier:
routers = self.get_sync_data(context, [router_id])
self.l3_agent_notifier.router_added_to_agent(
context, routers, agent_db.host)
def remove_router_from_l3_agent(self, context, id, router_id):
"""Remove the router from l3 agent. After it, the router
will be non-hosted until there is update which
lead to re schedule or be added to another agent manually."""
agent = self._get_agent(context, id)
with context.session.begin(subtransactions=True):
query = context.session.query(RouterL3AgentBinding)
query = query.filter(
RouterL3AgentBinding.router_id == router_id,
RouterL3AgentBinding.l3_agent_id == id)
try:
binding = query.one()
except exc.NoResultFound:
raise agentscheduler.RouterNotHostedByL3Agent(
router_id=router_id, agent_id=id)
context.session.delete(binding)
if self.l3_agent_notifier:
self.l3_agent_notifier.router_removed_from_agent(
context, router_id, agent.host)
def list_routers_on_l3_agent(self, context, id):
query = context.session.query(RouterL3AgentBinding.router_id)
router_ids = query.filter(
RouterL3AgentBinding.l3_agent_id == id).all()
if router_ids:
_ids = [item[0] for item in router_ids]
return {'routers':
self.get_routers(context, filters={'id': _ids})}
else:
return {'routers': []}
def list_active_sync_routers_on_active_l3_agent(
self, context, host, router_id):
agent = self._get_agent_by_type_and_host(
context, constants.AGENT_TYPE_L3, host)
if not agent.admin_state_up:
return []
query = context.session.query(RouterL3AgentBinding.router_id)
query = query.filter(
RouterL3AgentBinding.l3_agent_id == agent.id)
if router_id:
query = query.filter(RouterL3AgentBinding.router_id == router_id)
router_ids = query.all()
if router_ids:
_ids = [item[0] for item in router_ids]
routers = self.get_sync_data(context, router_ids=_ids,
active=True)
return routers
return []
def get_l3_agents_hosting_routers(self, context, router_ids,
admin_state_up=None,
active=None):
if not router_ids:
return []
query = context.session.query(RouterL3AgentBinding)
if len(router_ids) > 1:
query = query.options(joinedload('l3_agent')).filter(
RouterL3AgentBinding.router_id.in_(router_ids))
else:
query = query.options(joinedload('l3_agent')).filter(
RouterL3AgentBinding.router_id == router_ids[0])
if admin_state_up is not None:
query = (query.filter(agents_db.Agent.admin_state_up ==
admin_state_up))
l3_agents = [binding.l3_agent for binding in query.all()]
if active is not None:
l3_agents = [l3_agent for l3_agent in
l3_agents if not
agents_db.AgentDbMixin.is_agent_down(
l3_agent['heartbeat_timestamp'])]
return l3_agents
def _get_l3_bindings_hosting_routers(self, context, router_ids):
if not router_ids:
return []
query = context.session.query(RouterL3AgentBinding)
if len(router_ids) > 1:
query = query.options(joinedload('l3_agent')).filter(
RouterL3AgentBinding.router_id.in_(router_ids))
else:
query = query.options(joinedload('l3_agent')).filter(
RouterL3AgentBinding.router_id == router_ids[0])
return query.all()
def list_l3_agents_hosting_router(self, context, router_id):
with context.session.begin(subtransactions=True):
bindings = self._get_l3_bindings_hosting_routers(
context, [router_id])
results = []
for binding in bindings:
l3_agent_dict = self._make_agent_dict(binding.l3_agent)
results.append(l3_agent_dict)
if results:
return {'agents': results}
else:
return {'agents': []}
def schedule_network(self, context, request_network, created_network):
if self.network_scheduler:
result = self.network_scheduler.schedule(
self, context, request_network, created_network)
if not result:
LOG.warn(_('Fail scheduling network %s'), created_network)
def auto_schedule_networks(self, context, host):
if self.network_scheduler:
self.network_scheduler.auto_schedule_networks(self, context, host)
def get_l3_agents(self, context, active=None, filters=None):
query = context.session.query(agents_db.Agent)
query = query.filter(
agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
if active is not None:
query = (query.filter(agents_db.Agent.admin_state_up == active))
if filters:
for key, value in filters.iteritems():
column = getattr(agents_db.Agent, key, None)
if column:
query = query.filter(column.in_(value))
l3_agents = query.all()
if active is not None:
l3_agents = [l3_agent for l3_agent in
l3_agents if not
agents_db.AgentDbMixin.is_agent_down(
l3_agent['heartbeat_timestamp'])]
return l3_agents
def get_l3_agent_candidates(self, sync_router, l3_agents):
"""Get the valid l3 agents for the router from a list of l3_agents"""
candidates = []
for l3_agent in l3_agents:
if not l3_agent.admin_state_up:
continue
agent_conf = self.get_configuration_dict(l3_agent)
router_id = agent_conf.get('router_id', None)
use_namespaces = agent_conf.get('use_namespaces', True)
handle_internal_only_routers = agent_conf.get(
'handle_internal_only_routers', True)
gateway_external_network_id = agent_conf.get(
'gateway_external_network_id', None)
if not use_namespaces and router_id != sync_router['id']:
continue
ex_net_id = (sync_router['external_gateway_info'] or {}).get(
'network_id')
if ((not ex_net_id and not handle_internal_only_routers) or
(ex_net_id and gateway_external_network_id and
ex_net_id != gateway_external_network_id)):
continue
candidates.append(l3_agent)
return candidates
def auto_schedule_routers(self, context, host, router_id):
if self.router_scheduler:
return self.router_scheduler.auto_schedule_routers(
self, context, host, router_id)
def schedule_router(self, context, router):
if self.router_scheduler:
return self.router_scheduler.schedule(
self, context, router)
def schedule_routers(self, context, routers):
"""Schedule the routers to l3 agents.
"""
for router in routers:
self.schedule_router(context, router)

View File

@ -13,9 +13,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo.config import cfg
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common import utils
from quantum import manager from quantum import manager
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
@ -31,14 +34,24 @@ class DhcpRpcCallbackMixin(object):
host = kwargs.get('host') host = kwargs.get('host')
LOG.debug(_('Network list requested from %s'), host) LOG.debug(_('Network list requested from %s'), host)
plugin = manager.QuantumManager.get_plugin() plugin = manager.QuantumManager.get_plugin()
filters = dict(admin_state_up=[True]) if utils.is_extension_supported(
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
return [net['id'] for net in if cfg.CONF.network_auto_schedule:
plugin.get_networks(context, filters=filters)] plugin.auto_schedule_networks(context, host)
nets = plugin.list_active_networks_on_active_dhcp_agent(
context, host)
else:
filters = dict(admin_state_up=[True])
nets = plugin.get_networks(context, filters=filters)
return [net['id'] for net in nets]
def get_network_info(self, context, **kwargs): def get_network_info(self, context, **kwargs):
"""Retrieve and return a extended information about a network.""" """Retrieve and return a extended information about a network."""
network_id = kwargs.get('network_id') network_id = kwargs.get('network_id')
host = kwargs.get('host')
LOG.debug(_('Network %(network_id)s requested from '
'%(host)s'), {'network_id': network_id,
'host': host})
plugin = manager.QuantumManager.get_plugin() plugin = manager.QuantumManager.get_plugin()
network = plugin.get_network(context, network_id) network = plugin.get_network(context, network_id)
@ -62,7 +75,9 @@ class DhcpRpcCallbackMixin(object):
# a device id that combines host and network ids # a device id that combines host and network ids
LOG.debug(_('Port %(device_id)s for %(network_id)s requested from ' LOG.debug(_('Port %(device_id)s for %(network_id)s requested from '
'%(host)s'), locals()) '%(host)s'), {'device_id': device_id,
'network_id': network_id,
'host': host})
plugin = manager.QuantumManager.get_plugin() plugin = manager.QuantumManager.get_plugin()
retval = None retval = None

View File

@ -154,11 +154,12 @@ class ExtraRoute_db_mixin(l3_db.L3_NAT_db_mixin):
context, router['id']) context, router['id'])
return routers return routers
def get_sync_data(self, context, router_ids=None): def get_sync_data(self, context, router_ids=None, active=None):
"""Query routers and their related floating_ips, interfaces.""" """Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
routers = super(ExtraRoute_db_mixin, routers = super(ExtraRoute_db_mixin,
self).get_sync_data(context, router_ids) self).get_sync_data(context, router_ids,
active=active)
for router in routers: for router in routers:
router['routes'] = self._get_extra_routes_by_router_id( router['routes'] = self._get_extra_routes_by_router_id(
context, router['id']) context, router['id'])

View File

@ -23,11 +23,11 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
from sqlalchemy.sql import expression as expr from sqlalchemy.sql import expression as expr
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import constants as l3_constants from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import l3_rpc_agent_api
from quantum.db import model_base from quantum.db import model_base
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.extensions import l3 from quantum.extensions import l3
@ -328,7 +328,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
if len(fixed_ips) != 1: if len(fixed_ips) != 1:
msg = _('Router port must have exactly one fixed IP') msg = _('Router port must have exactly one fixed IP')
raise q_exc.BadRequest(resource='router', msg=msg) raise q_exc.BadRequest(resource='router', msg=msg)
subnet = self._get_subnet(context, fixed_ips[0]['subnet_id']) subnet_id = fixed_ips[0]['subnet_id']
subnet = self._get_subnet(context, subnet_id)
self._check_for_dup_router_subnet(context, router_id, self._check_for_dup_router_subnet(context, router_id,
port['network_id'], port['network_id'],
subnet['id'], subnet['id'],
@ -360,7 +361,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'name': ''}}) 'name': ''}})
routers = self.get_sync_data(context.elevated(), [router_id]) routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) l3_rpc_agent_api.L3AgentNotify.routers_updated(
context, routers, 'add_router_interface',
{'network_id': port['network_id'],
'subnet_id': subnet_id})
info = {'port_id': port['id'], info = {'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']} 'subnet_id': port['fixed_ips'][0]['subnet_id']}
notifier_api.notify(context, notifier_api.notify(context,
@ -409,6 +413,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
subnet_id = port_db['fixed_ips'][0]['subnet_id'] subnet_id = port_db['fixed_ips'][0]['subnet_id']
self._confirm_router_interface_not_in_use( self._confirm_router_interface_not_in_use(
context, router_id, subnet_id) context, router_id, subnet_id)
_network_id = port_db['network_id']
self.delete_port(context, port_db['id'], l3_port_check=False) self.delete_port(context, port_db['id'], l3_port_check=False)
elif 'subnet_id' in interface_info: elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id'] subnet_id = interface_info['subnet_id']
@ -428,6 +433,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
for p in ports: for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id: if p['fixed_ips'][0]['subnet_id'] == subnet_id:
port_id = p['id'] port_id = p['id']
_network_id = p['network_id']
self.delete_port(context, p['id'], l3_port_check=False) self.delete_port(context, p['id'], l3_port_check=False)
found = True found = True
break break
@ -438,7 +444,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id, raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
subnet_id=subnet_id) subnet_id=subnet_id)
routers = self.get_sync_data(context.elevated(), [router_id]) routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) l3_rpc_agent_api.L3AgentNotify.routers_updated(
context, routers, 'remove_router_interface',
{'network_id': _network_id,
'subnet_id': subnet_id})
notifier_api.notify(context, notifier_api.notify(context,
notifier_api.publisher_id('network'), notifier_api.publisher_id('network'),
'router.interface.delete', 'router.interface.delete',
@ -649,7 +658,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router_id = floatingip_db['router_id'] router_id = floatingip_db['router_id']
if router_id: if router_id:
routers = self.get_sync_data(context.elevated(), [router_id]) routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
'create_floatingip')
return self._make_floatingip_dict(floatingip_db) return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip): def update_floatingip(self, context, id, floatingip):
@ -671,7 +681,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router_ids.append(router_id) router_ids.append(router_id)
if router_ids: if router_ids:
routers = self.get_sync_data(context.elevated(), router_ids) routers = self.get_sync_data(context.elevated(), router_ids)
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
'update_floatingip')
return self._make_floatingip_dict(floatingip_db) return self._make_floatingip_dict(floatingip_db)
def delete_floatingip(self, context, id): def delete_floatingip(self, context, id):
@ -684,7 +695,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
l3_port_check=False) l3_port_check=False)
if router_id: if router_id:
routers = self.get_sync_data(context.elevated(), [router_id]) routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
'delete_floatingip')
def get_floatingip(self, context, id, fields=None): def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id) floatingip = self._get_floatingip(context, id)
@ -816,7 +828,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
else: else:
return [n for n in nets if n['id'] not in ext_nets] return [n for n in nets if n['id'] not in ext_nets]
def _get_sync_routers(self, context, router_ids=None): def _get_sync_routers(self, context, router_ids=None, active=None):
"""Query routers and their gw ports for l3 agent. """Query routers and their gw ports for l3 agent.
Query routers with the router_ids. The gateway ports, if any, Query routers with the router_ids. The gateway ports, if any,
@ -831,7 +843,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
""" """
router_query = context.session.query(Router) router_query = context.session.query(Router)
if router_ids: if router_ids:
router_query = router_query.filter(Router.id.in_(router_ids)) if 1 == len(router_ids):
router_query = router_query.filter(Router.id == router_ids[0])
else:
router_query = router_query.filter(Router.id.in_(router_ids))
if active is not None:
router_query = router_query.filter(Router.admin_state_up == active)
routers = router_query.all() routers = router_query.all()
gw_port_ids = [] gw_port_ids = []
if not routers: if not routers:
@ -842,7 +859,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
gw_port_ids.append(gw_port_id) gw_port_ids.append(gw_port_id)
gw_ports = [] gw_ports = []
if gw_port_ids: if gw_port_ids:
gw_ports = self._get_sync_gw_ports(context, gw_port_ids) gw_ports = self.get_sync_gw_ports(context, gw_port_ids)
gw_port_id_gw_port_dict = {} gw_port_id_gw_port_dict = {}
for gw_port in gw_ports: for gw_port in gw_ports:
gw_port_id_gw_port_dict[gw_port['id']] = gw_port gw_port_id_gw_port_dict[gw_port['id']] = gw_port
@ -862,7 +879,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
return [] return []
return self.get_floatingips(context, {'router_id': router_ids}) return self.get_floatingips(context, {'router_id': router_ids})
def _get_sync_gw_ports(self, context, gw_port_ids): def get_sync_gw_ports(self, context, gw_port_ids):
if not gw_port_ids: if not gw_port_ids:
return [] return []
filters = {'id': gw_port_ids} filters = {'id': gw_port_ids}
@ -871,12 +888,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
self._populate_subnet_for_ports(context, gw_ports) self._populate_subnet_for_ports(context, gw_ports)
return gw_ports return gw_ports
def _get_sync_interfaces(self, context, router_ids): def get_sync_interfaces(self, context, router_ids,
device_owner=DEVICE_OWNER_ROUTER_INTF):
"""Query router interfaces that relate to list of router_ids.""" """Query router interfaces that relate to list of router_ids."""
if not router_ids: if not router_ids:
return [] return []
filters = {'device_id': router_ids, filters = {'device_id': router_ids,
'device_owner': [DEVICE_OWNER_ROUTER_INTF]} 'device_owner': [device_owner]}
interfaces = self.get_ports(context, filters) interfaces = self.get_ports(context, filters)
if interfaces: if interfaces:
self._populate_subnet_for_ports(context, interfaces) self._populate_subnet_for_ports(context, interfaces)
@ -934,14 +952,15 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
router[l3_constants.INTERFACE_KEY] = router_interfaces router[l3_constants.INTERFACE_KEY] = router_interfaces
return routers_dict.values() return routers_dict.values()
def get_sync_data(self, context, router_ids=None): def get_sync_data(self, context, router_ids=None, active=None):
"""Query routers and their related floating_ips, interfaces.""" """Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
routers = self._get_sync_routers(context, routers = self._get_sync_routers(context,
router_ids) router_ids=router_ids,
active=active)
router_ids = [router['id'] for router in routers] router_ids = [router['id'] for router in routers]
floating_ips = self._get_sync_floating_ips(context, router_ids) floating_ips = self._get_sync_floating_ips(context, router_ids)
interfaces = self._get_sync_interfaces(context, router_ids) interfaces = self.get_sync_interfaces(context, router_ids)
return self._process_sync_data(routers, interfaces, floating_ips) return self._process_sync_data(routers, interfaces, floating_ips)
def get_external_network_id(self, context): def get_external_network_id(self, context):

View File

@ -1,50 +0,0 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum.common import topics
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class L3AgentNotifyAPI(proxy.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.L3_AGENT):
super(L3AgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def router_deleted(self, context, router_id):
LOG.debug(_('Notify agent the router %s is deleted'), router_id)
self.cast(context,
self.make_msg('router_deleted',
router_id=router_id),
topic=self.topic)
def routers_updated(self, context, routers):
if routers:
LOG.debug(_('Notify agent routers were updated:\n %s'),
jsonutils.dumps(routers, indent=5))
self.cast(context,
self.make_msg('routers_updated',
routers=routers),
topic=self.topic)
L3AgentNotify = L3AgentNotifyAPI()

View File

@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo.config import cfg
from quantum.common import constants
from quantum.common import utils
from quantum import context as quantum_context from quantum import context as quantum_context
from quantum import manager from quantum import manager
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
@ -34,10 +38,17 @@ class L3RpcCallbackMixin(object):
with their interfaces and floating_ips with their interfaces and floating_ips
""" """
router_id = kwargs.get('router_id') router_id = kwargs.get('router_id')
# TODO(gongysh) we will use host in kwargs for multi host BP host = kwargs.get('host')
context = quantum_context.get_admin_context() context = quantum_context.get_admin_context()
plugin = manager.QuantumManager.get_plugin() plugin = manager.QuantumManager.get_plugin()
routers = plugin.get_sync_data(context, router_id) if utils.is_extension_supported(
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
if cfg.CONF.router_auto_schedule:
plugin.auto_schedule_routers(context, host, router_id)
routers = plugin.list_active_sync_routers_on_active_l3_agent(
context, host, router_id)
else:
routers = plugin.get_sync_data(context, router_id)
LOG.debug(_("Routers returned to l3 agent:\n %s"), LOG.debug(_("Routers returned to l3 agent:\n %s"),
jsonutils.dumps(routers, indent=5)) jsonutils.dumps(routers, indent=5))
return routers return routers

View File

@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""agent scheduler
Revision ID: 4692d074d587
Revises: 3b54bf9e29f7
Create Date: 2013-02-21 23:01:50.370306
"""
# revision identifiers, used by Alembic.
revision = '4692d074d587'
down_revision = '3b54bf9e29f7'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2'
]
from alembic import op
import sqlalchemy as sa
from quantum.db import migration
def upgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
### commands auto generated by Alembic - please adjust! ###
op.create_table(
'networkdhcpagentbindings',
sa.Column('network_id', sa.String(length=36), nullable=False),
sa.Column('dhcp_agent_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['dhcp_agent_id'], ['agents.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['network_id'], ['networks.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('network_id', 'dhcp_agent_id')
)
op.create_table(
'routerl3agentbindings',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('router_id', sa.String(length=36), nullable=True),
sa.Column('l3_agent_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
### end Alembic commands ###
def downgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
### commands auto generated by Alembic - please adjust! ###
op.drop_table('routerl3agentbindings')
op.drop_table('networkdhcpagentbindings')
### end Alembic commands ###

View File

@ -0,0 +1,252 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack, LLC.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import abstractmethod
from quantum.api import extensions
from quantum.api.v2 import base
from quantum.api.v2 import resource
from quantum.common import constants
from quantum.common import exceptions
from quantum.extensions import agent
from quantum import manager
from quantum import policy
from quantum import wsgi
DHCP_NET = 'dhcp-network'
DHCP_NETS = DHCP_NET + 's'
DHCP_AGENT = 'dhcp-agent'
DHCP_AGENTS = DHCP_AGENT + 's'
L3_ROUTER = 'l3-router'
L3_ROUTERS = L3_ROUTER + 's'
L3_AGENT = 'l3-agent'
L3_AGENTS = L3_AGENT + 's'
class NetworkSchedulerController(wsgi.Controller):
def index(self, request, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"get_%s" % DHCP_NETS,
{},
plugin=plugin)
return plugin.list_networks_on_dhcp_agent(
request.context, kwargs['agent_id'])
def create(self, request, body, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"create_%s" % DHCP_NET,
{},
plugin=plugin)
return plugin.add_network_to_dhcp_agent(
request.context, kwargs['agent_id'], body['network_id'])
def delete(self, request, id, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"delete_%s" % DHCP_NET,
{},
plugin=plugin)
return plugin.remove_network_from_dhcp_agent(
request.context, kwargs['agent_id'], id)
class RouterSchedulerController(wsgi.Controller):
def index(self, request, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"get_%s" % L3_ROUTERS,
{},
plugin=plugin)
return plugin.list_routers_on_l3_agent(
request.context, kwargs['agent_id'])
def create(self, request, body, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"create_%s" % L3_ROUTER,
{},
plugin=plugin)
return plugin.add_router_to_l3_agent(
request.context,
kwargs['agent_id'],
body['router_id'])
def delete(self, request, id, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"delete_%s" % L3_ROUTER,
{},
plugin=plugin)
return plugin.remove_router_from_l3_agent(
request.context, kwargs['agent_id'], id)
class DhcpAgentsHostingNetworkController(wsgi.Controller):
def index(self, request, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"get_%s" % DHCP_AGENTS,
{},
plugin=plugin)
return plugin.list_dhcp_agents_hosting_network(
request.context, kwargs['network_id'])
class L3AgentsHostingRouterController(wsgi.Controller):
def index(self, request, **kwargs):
plugin = manager.QuantumManager.get_plugin()
policy.enforce(request.context,
"get_%s" % L3_AGENTS,
{},
plugin=plugin)
return plugin.list_l3_agents_hosting_router(
request.context, kwargs['router_id'])
class Agentscheduler(extensions.ExtensionDescriptor):
"""Extension class supporting agent scheduler.
"""
@classmethod
def get_name(cls):
return "Agent Schedulers"
@classmethod
def get_alias(cls):
return constants.AGENT_SCHEDULER_EXT_ALIAS
@classmethod
def get_description(cls):
return "Schedule resources among agents"
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/agent_scheduler/api/v1.0"
@classmethod
def get_updated(cls):
return "2013-02-03T10:00:00-00:00"
@classmethod
def get_resources(cls):
"""Returns Ext Resources """
exts = []
parent = dict(member_name="agent",
collection_name="agents")
controller = resource.Resource(NetworkSchedulerController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
DHCP_NETS, controller, parent))
controller = resource.Resource(RouterSchedulerController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
L3_ROUTERS, controller, parent))
parent = dict(member_name="network",
collection_name="networks")
controller = resource.Resource(DhcpAgentsHostingNetworkController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
DHCP_AGENTS, controller, parent))
parent = dict(member_name="router",
collection_name="routers")
controller = resource.Resource(L3AgentsHostingRouterController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
L3_AGENTS, controller, parent))
return exts
def get_extended_resources(self, version):
return {}
class InvalidDHCPAgent(agent.AgentNotFound):
message = _("Agent %(id)s is not a valid DHCP Agent or has been disabled")
class NetworkHostedByDHCPAgent(exceptions.Conflict):
message = _("The network %(network_id)s has been already hosted"
" by the DHCP Agent %(agent_id)s.")
class NetworkNotHostedByDhcpAgent(exceptions.Conflict):
message = _("The network %(network_id)s is not hosted"
" by the DHCP agent %(agent_id)s.")
class InvalidL3Agent(agent.AgentNotFound):
message = _("Agent %(id)s is not a L3 Agent or has been disabled")
class RouterHostedByL3Agent(exceptions.Conflict):
message = _("The router %(router_id)s has been already hosted"
" by the L3 Agent %(agent_id)s.")
class RouterSchedulingFailed(exceptions.Conflict):
message = _("Failed scheduling router %(router_id)s to"
" the L3 Agent %(agent_id)s.")
class RouterNotHostedByL3Agent(exceptions.Conflict):
message = _("The router %(router_id)s is not hosted"
" by L3 agent %(agent_id)s.")
class AgentSchedulerPluginBase(object):
""" REST API to operate the agent scheduler.
All of method must be in an admin context.
"""
@abstractmethod
def add_network_to_dhcp_agent(self, context, id, network_id):
pass
@abstractmethod
def remove_network_from_dhcp_agent(self, context, id, network_id):
pass
@abstractmethod
def list_networks_on_dhcp_agent(self, context, id):
pass
@abstractmethod
def list_dhcp_agents_hosting_network(self, context, network_id):
pass
@abstractmethod
def add_router_to_l3_agent(self, context, id, router_id):
pass
@abstractmethod
def remove_router_from_l3_agent(self, context, id, router_id):
pass
@abstractmethod
def list_routers_on_l3_agent(self, context, id):
pass
@abstractmethod
def list_l3_agents_hosting_router(self, context, router_id):
pass

View File

@ -17,6 +17,7 @@
from oslo.config import cfg from oslo.config import cfg
from quantum.agent.common import config from quantum.agent.common import config
from quantum import scheduler
DEFAULT_BRIDGE_MAPPINGS = [] DEFAULT_BRIDGE_MAPPINGS = []
@ -64,3 +65,4 @@ cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT") cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF) config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF) config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS)

View File

@ -25,12 +25,15 @@ import sys
from oslo.config import cfg from oslo.config import cfg
from quantum.agent import securitygroups_rpc as sg_rpc from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import constants as q_const from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc from quantum.common import rpc as q_rpc
from quantum.common import topics from quantum.common import topics
from quantum.db import agents_db from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db from quantum.db import extraroute_db
@ -41,6 +44,7 @@ from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings from quantum.extensions import portbindings
from quantum.extensions import providernet as provider from quantum.extensions import providernet as provider
from quantum.extensions import securitygroup as ext_sg from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy from quantum.openstack.common.rpc import proxy
@ -211,7 +215,9 @@ class AgentNotifierApi(proxy.RpcProxy,
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
extraroute_db.ExtraRoute_db_mixin, extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin, sg_db_rpc.SecurityGroupServerRpcMixin,
agents_db.AgentDbMixin): agents_db.AgentDbMixin,
agentschedulers_db.AgentSchedulerDbMixin):
"""Implement the Quantum abstractions using Open vSwitch. """Implement the Quantum abstractions using Open vSwitch.
Depending on whether tunneling is enabled, either a GRE tunnel or Depending on whether tunneling is enabled, either a GRE tunnel or
@ -238,8 +244,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
supported_extension_aliases = ["provider", "router", supported_extension_aliases = ["provider", "router",
"binding", "quotas", "security-group", "binding", "quotas", "security-group",
"agent", "agent", "extraroute", "agent_scheduler"]
"extraroute"]
network_view = "extension:provider_network:view" network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set" network_set = "extension:provider_network:set"
@ -269,12 +274,18 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
"Agent terminated!")) "Agent terminated!"))
sys.exit(1) sys.exit(1)
self.setup_rpc() self.setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
def setup_rpc(self): def setup_rpc(self):
# RPC support # RPC support
self.topic = topics.PLUGIN self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True) self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT) self.notifier = AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.callbacks = OVSRpcCallbacks(self.notifier) self.callbacks = OVSRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher() self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher, self.conn.create_consumer(self.topic, self.dispatcher,
@ -486,6 +497,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_l3(context, net) self._extend_network_dict_l3(context, net)
# note - exception will rollback entire transaction # note - exception will rollback entire transaction
LOG.debug(_("Created network: %s"), net['id']) LOG.debug(_("Created network: %s"), net['id'])
self.schedule_network(context, network['network'], net)
return net return net
def update_network(self, context, id, network): def update_network(self, context, id, network):
@ -569,6 +581,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
else: else:
self.notifier.security_groups_member_updated( self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS)) context, port.get(ext_sg.SECURITYGROUPS))
net = self.get_network(context, port['network_id'])
self.schedule_network(context, None, net)
return self._extend_port_dict_binding(context, port) return self._extend_port_dict_binding(context, port)
def get_port(self, context, id, fields=None): def get_port(self, context, id, fields=None):
@ -636,3 +650,20 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.notifier.security_groups_member_updated( self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS)) context, port.get(ext_sg.SECURITYGROUPS))
def update_agent(self, context, id, agent):
original_agent = self.get_agent(context, id)
result = super(OVSQuantumPluginV2, self).update_agent(
context, id, agent)
agent_data = agent['agent']
if ('admin_state_up' in agent_data and
original_agent['admin_state_up'] != agent_data['admin_state_up']):
if original_agent['agent_type'] == q_const.AGENT_TYPE_DHCP:
self.dhcp_agent_notifier.agent_updated(
context, agent_data['admin_state_up'],
original_agent['host'])
elif original_agent['agent_type'] == q_const.AGENT_TYPE_L3:
self.l3_agent_notifier.agent_updated(
context, agent_data['admin_state_up'],
original_agent['host'])
return result

View File

@ -51,6 +51,7 @@ def init():
raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file) raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
# pass _set_brain to read_cached_file so that the policy brain # pass _set_brain to read_cached_file so that the policy brain
# is reset only if the file has changed # is reset only if the file has changed
LOG.debug(_("loading policy file at %s"), _POLICY_PATH)
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE, utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
reload_func=_set_rules) reload_func=_set_rules)

View File

@ -0,0 +1,34 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
AGENTS_SCHEDULER_OPTS = [
cfg.StrOpt('network_scheduler_driver',
default='quantum.scheduler.'
'dhcp_agent_scheduler.ChanceScheduler',
help=_('Driver to use for scheduling network to DHCP agent')),
cfg.StrOpt('router_scheduler_driver',
default='quantum.scheduler.l3_agent_scheduler.ChanceScheduler',
help=_('Driver to use for scheduling '
'router to a default L3 agent')),
cfg.BoolOpt('network_auto_schedule', default=True,
help=_('Allow auto scheduling networks to DHCP agent.')),
cfg.BoolOpt('router_auto_schedule', default=True,
help=_('Allow auto scheduling routers to L3 agent.')),
]

View File

@ -0,0 +1,108 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from sqlalchemy.orm import exc
from sqlalchemy.sql import exists
from quantum.common import constants
from quantum.db import models_v2
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class ChanceScheduler(object):
"""Allocate a DHCP agent for a network in a random way.
More sophisticated scheduler (similar to filter scheduler in nova?)
can be introduced later."""
def schedule(self, plugin, context, request_network, network):
"""Schedule the network to an active DHCP agent if there
is no active DHCP agent hosting it.
"""
#TODO(gongysh) don't schedule the networks with only
# subnets whose enable_dhcp is false
with context.session.begin(subtransactions=True):
dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
context, [network['id']], active=True)
if dhcp_agents:
LOG.debug(_('Network %s is hosted already'),
network['id'])
return False
enabled_dhcp_agents = plugin.get_agents_db(
context, filters={
'agent_type': [constants.AGENT_TYPE_DHCP],
'admin_state_up': [True]})
if not enabled_dhcp_agents:
LOG.warn(_('No enabled DHCP agents'))
return False
active_dhcp_agents = [enabled_dhcp_agent for enabled_dhcp_agent in
enabled_dhcp_agents if not
agents_db.AgentDbMixin.is_agent_down(
enabled_dhcp_agent['heartbeat_timestamp'])]
if not active_dhcp_agents:
LOG.warn(_('No active DHCP agents'))
return False
chosen_agent = random.choice(active_dhcp_agents)
binding = agentschedulers_db.NetworkDhcpAgentBinding()
binding.dhcp_agent = chosen_agent
binding.network_id = network['id']
context.session.add(binding)
LOG.debug(_('Network %(network_id)s is scheduled to be hosted by '
'DHCP agent %(agent_id)s'),
{'network_id': network['id'],
'agent_id': chosen_agent['id']})
return True
def auto_schedule_networks(self, plugin, context, host):
"""Schedule non-hosted networks to the DHCP agent on
the specified host."""
with context.session.begin(subtransactions=True):
query = context.session.query(agents_db.Agent)
query = query.filter(agents_db.Agent.agent_type ==
constants.AGENT_TYPE_DHCP,
agents_db.Agent.host == host,
agents_db.Agent.admin_state_up == True)
try:
dhcp_agent = query.one()
except (exc.MultipleResultsFound, exc.NoResultFound):
LOG.warn(_('No enabled DHCP agent on host %s'),
host)
return False
if agents_db.AgentDbMixin.is_agent_down(
dhcp_agent.heartbeat_timestamp):
LOG.warn(_('DHCP agent %s is not active'), dhcp_agent.id)
#TODO(gongysh) consider the disabled agent's network
net_stmt = ~exists().where(
models_v2.Network.id ==
agentschedulers_db.NetworkDhcpAgentBinding.network_id)
net_ids = context.session.query(
models_v2.Network.id).filter(net_stmt).all()
if not net_ids:
LOG.debug(_('No non-hosted networks'))
return False
for net_id in net_ids:
binding = agentschedulers_db.NetworkDhcpAgentBinding()
binding.dhcp_agent = dhcp_agent
binding.network_id = net_id[0]
context.session.add(binding)
return True

View File

@ -0,0 +1,149 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from sqlalchemy.orm import exc
from sqlalchemy.sql import exists
from quantum.common import constants
from quantum.db import l3_db
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class ChanceScheduler(object):
"""Allocate a L3 agent for a router in a random way.
More sophisticated scheduler (similar to filter scheduler in nova?)
can be introduced later."""
def auto_schedule_routers(self, plugin, context, host, router_id):
"""Schedule non-hosted routers to L3 Agent running on host.
If router_id is given, only this router is scheduled
if it is not hosted yet.
Don't schedule the routers which are hosted already
by active l3 agents.
"""
with context.session.begin(subtransactions=True):
# query if we have valid l3 agent on the host
query = context.session.query(agents_db.Agent)
query = query.filter(agents_db.Agent.agent_type ==
constants.AGENT_TYPE_L3,
agents_db.Agent.host == host,
agents_db.Agent.admin_state_up == True)
try:
l3_agent = query.one()
except (exc.MultipleResultsFound, exc.NoResultFound):
LOG.debug(_('No enabled L3 agent on host %s'),
host)
return False
if agents_db.AgentDbMixin.is_agent_down(
l3_agent.heartbeat_timestamp):
LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
# check if the specified router is hosted
if router_id:
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [router_id], admin_state_up=True)
if l3_agents:
LOG.debug(_('Router %(router_id)s has already been hosted'
' by L3 agent %(agent_id)s'),
{'router_id': router_id,
'agent_id': l3_agents[0]['id']})
return False
# get the router ids
if router_id:
router_ids = [(router_id,)]
else:
# get all routers that are not hosted
#TODO(gongysh) consider the disabled agent's router
stmt = ~exists().where(
l3_db.Router.id ==
agentschedulers_db.RouterL3AgentBinding.router_id)
router_ids = context.session.query(
l3_db.Router.id).filter(stmt).all()
if not router_ids:
LOG.debug(_('No non-hosted routers'))
return False
# check if the configuration of l3 agent is compatible
# with the router
router_ids = [router_id[0] for router_id in router_ids]
routers = plugin.get_routers(context, filters={'id': router_ids})
to_removed_ids = []
for router in routers:
candidates = plugin.get_l3_agent_candidates(router, [l3_agent])
if not candidates:
to_removed_ids.append(router['id'])
router_ids = list(set(router_ids) - set(to_removed_ids))
if not router_ids:
LOG.warn(_('No routers compatible with L3 agent configuration'
' on host %s', host))
return False
# binding
for router_id in router_ids:
binding = agentschedulers_db.RouterL3AgentBinding()
binding.l3_agent = l3_agent
binding.router_id = router_id
binding.default = True
context.session.add(binding)
return True
def schedule(self, plugin, context, sync_router):
"""Schedule the router to an active L3 agent if there
is no enable L3 agent hosting it.
"""
with context.session.begin(subtransactions=True):
# allow one router is hosted by just
# one enabled l3 agent hosting since active is just a
# timing problem. Non-active l3 agent can return to
# active any time
l3_agents = plugin.get_l3_agents_hosting_routers(
context, [sync_router['id']], admin_state_up=True)
if l3_agents:
LOG.debug(_('Router %(router_id)s has already been hosted'
' by L3 agent %(agent_id)s'),
{'router_id': sync_router['id'],
'agent_id': l3_agents[0]['id']})
return False
active_l3_agents = plugin.get_l3_agents(context, active=True)
if not active_l3_agents:
LOG.warn(_('No active L3 agents'))
return False
candidates = plugin.get_l3_agent_candidates(sync_router,
active_l3_agents)
if not candidates:
LOG.warn(_('No L3 agents can host the router %s'),
sync_router['id'])
return False
chosen_agent = random.choice(candidates)
binding = agentschedulers_db.RouterL3AgentBinding()
binding.l3_agent = chosen_agent
binding.router_id = sync_router['id']
context.session.add(binding)
LOG.debug(_('Router %(router_id)s is scheduled to '
'L3 agent %(agent_id)s'),
{'router_id': sync_router['id'],
'agent_id': chosen_agent['id']})
return True

View File

@ -0,0 +1,803 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import copy
import mock
from webob import exc
from quantum.api import extensions
from quantum.common import constants
from quantum import context
from quantum.db import agents_db
from quantum.db import dhcp_rpc_base
from quantum.db import l3_rpc_base
from quantum.extensions import agentscheduler
from quantum import manager
from quantum.openstack.common import uuidutils
from quantum.plugins.openvswitch.ovs_quantum_plugin import OVSQuantumPluginV2
from quantum.tests.unit import test_agent_ext_plugin
from quantum.tests.unit.testlib_api import create_request
from quantum.tests.unit import test_db_plugin as test_plugin
from quantum.tests.unit import test_extensions
from quantum.tests.unit import test_l3_plugin
from quantum.wsgi import Serializer
L3_HOSTA = 'hosta'
DHCP_HOSTA = 'hosta'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
class AgentSchedulerTestMixIn(object):
def _request_list(self, path, admin_context=True,
expected_code=exc.HTTPOk.code):
req = self._path_req(path, admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
return self.deserialize(self.fmt, res)
def _path_req(self, path, method='GET', data=None,
query_string=None,
admin_context=True):
content_type = 'application/%s' % self.fmt
body = None
if data is not None: # empty dict is valid
body = Serializer().serialize(data, content_type)
if admin_context:
return create_request(
path, body, content_type, method, query_string=query_string)
else:
return create_request(
path, body, content_type, method, query_string=query_string,
context=context.Context('', 'tenant_id'))
def _path_create_request(self, path, data, admin_context=True):
return self._path_req(path, method='POST', data=data,
admin_context=admin_context)
def _path_show_request(self, path, admin_context=True):
return self._path_req(path, admin_context=admin_context)
def _path_delete_request(self, path, admin_context=True):
return self._path_req(path, method='DELETE',
admin_context=admin_context)
def _path_update_request(self, path, data, admin_context=True):
return self._path_req(path, method='PUT', data=data,
admin_context=admin_context)
def _list_routers_hosted_by_l3_agent(self, agent_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (agent_id,
agentscheduler.L3_ROUTERS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
def _list_networks_hosted_by_dhcp_agent(self, agent_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (agent_id,
agentscheduler.DHCP_NETS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
def _list_l3_agents_hosting_router(self, router_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/routers/%s/%s.%s" % (router_id,
agentscheduler.L3_AGENTS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
def _list_dhcp_agents_hosting_network(self, network_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/networks/%s/%s.%s" % (network_id,
agentscheduler.DHCP_AGENTS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
def _add_router_to_l3_agent(self, id, router_id,
expected_code=exc.HTTPCreated.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (id,
agentscheduler.L3_ROUTERS,
self.fmt)
req = self._path_create_request(path,
{'router_id': router_id},
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
def _add_network_to_dhcp_agent(self, id, network_id,
expected_code=exc.HTTPCreated.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (id,
agentscheduler.DHCP_NETS,
self.fmt)
req = self._path_create_request(path,
{'network_id': network_id},
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
def _remove_network_from_dhcp_agent(self, id, network_id,
expected_code=exc.HTTPNoContent.code,
admin_context=True):
path = "/agents/%s/%s/%s.%s" % (id,
agentscheduler.DHCP_NETS,
network_id,
self.fmt)
req = self._path_delete_request(path,
admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
def _remove_router_from_l3_agent(self, id, router_id,
expected_code=exc.HTTPNoContent.code,
admin_context=True):
path = "/agents/%s/%s/%s.%s" % (id,
agentscheduler.L3_ROUTERS,
router_id,
self.fmt)
req = self._path_delete_request(path, admin_context=admin_context)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
def _register_one_agent_state(self, agent_state):
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': agent_state})
def _disable_agent(self, agent_id, admin_state_up=False):
new_agent = {}
new_agent['agent'] = {}
new_agent['agent']['admin_state_up'] = admin_state_up
self._update('agents', agent_id, new_agent)
def _get_agent_id(self, agent_type, host):
agents = self._list_agents()
for agent in agents['agents']:
if (agent['agent_type'] == agent_type and
agent['host'] == host):
return agent['id']
class AgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
test_agent_ext_plugin.AgentDBTestMixIn,
AgentSchedulerTestMixIn,
test_plugin.QuantumDbPluginV2TestCase):
fmt = 'json'
def setUp(self):
plugin = ('quantum.plugins.openvswitch.'
'ovs_quantum_plugin.OVSQuantumPluginV2')
self.dhcp_notifier_cls_p = mock.patch(
'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
'DhcpAgentNotifyAPI')
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
super(AgentSchedulerTestCase, self).setUp(plugin)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
def test_report_states(self):
self._register_agent_states()
agents = self._list_agents()
self.assertEqual(4, len(agents['agents']))
def test_network_scheduling_on_network_creation(self):
self._register_agent_states()
with self.network() as net:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net['network']['id'])
self.assertEqual(1, len(dhcp_agents['agents']))
def test_network_auto_schedule_with_disabled(self):
with contextlib.nested(self.network(),
self.network()):
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTC)
self._disable_agent(hosta_id)
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
# second agent will host all the networks since first is disabled.
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
networks = self._list_networks_hosted_by_dhcp_agent(hostc_id)
num_hostc_nets = len(networks['networks'])
networks = self._list_networks_hosted_by_dhcp_agent(hosta_id)
num_hosta_nets = len(networks['networks'])
self.assertEqual(0, num_hosta_nets)
self.assertEqual(2, num_hostc_nets)
def test_network_auto_schedule_with_hosted(self):
# one agent hosts all the networks, other hosts none
with contextlib.nested(self.network(),
self.network()) as (net1, net2):
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
self._register_agent_states()
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
# second agent will not host the network since first has got it.
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
dhcp_agents = self._list_dhcp_agents_hosting_network(
net1['network']['id'])
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTC)
hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
num_hosta_nets = len(hosta_nets['networks'])
hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
num_hostc_nets = len(hostc_nets['networks'])
self.assertEqual(2, num_hosta_nets)
self.assertEqual(0, num_hostc_nets)
self.assertEqual(1, len(dhcp_agents['agents']))
self.assertEqual(DHCP_HOSTA, dhcp_agents['agents'][0]['host'])
def test_network_auto_schedule_with_hosted_2(self):
# one agent hosts one network
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
dhcp_hostc = copy.deepcopy(dhcp_hosta)
dhcp_hostc['host'] = DHCP_HOSTC
with self.network() as net1:
self._register_one_agent_state(dhcp_hosta)
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
with self.network() as net2:
self._register_one_agent_state(dhcp_hostc)
dhcp_rpc.get_active_networks(self.adminContext,
host=DHCP_HOSTC)
dhcp_agents_1 = self._list_dhcp_agents_hosting_network(
net1['network']['id'])
dhcp_agents_2 = self._list_dhcp_agents_hosting_network(
net2['network']['id'])
hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
num_hosta_nets = len(hosta_nets['networks'])
hostc_id = self._get_agent_id(
constants.AGENT_TYPE_DHCP,
DHCP_HOSTC)
hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
num_hostc_nets = len(hostc_nets['networks'])
self.assertEqual(1, num_hosta_nets)
self.assertEqual(1, num_hostc_nets)
self.assertEqual(1, len(dhcp_agents_1['agents']))
self.assertEqual(1, len(dhcp_agents_2['agents']))
self.assertEqual(DHCP_HOSTA, dhcp_agents_1['agents'][0]['host'])
self.assertEqual(DHCP_HOSTC, dhcp_agents_2['agents'][0]['host'])
def test_network_scheduling_on_port_creation(self):
with self.subnet() as subnet:
dhcp_agents = self._list_dhcp_agents_hosting_network(
subnet['subnet']['network_id'])
result0 = len(dhcp_agents['agents'])
self._register_agent_states()
with self.port(subnet=subnet,
device_owner="compute:test:" + DHCP_HOSTA) as port:
dhcp_agents = self._list_dhcp_agents_hosting_network(
port['port']['network_id'])
result1 = len(dhcp_agents['agents'])
self.assertEqual(0, result0)
self.assertEqual(1, result1)
def test_network_scheduler_with_disabled_agent(self):
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
self._register_one_agent_state(dhcp_hosta)
with self.network() as net1:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net1['network']['id'])
self.assertEqual(1, len(dhcp_agents['agents']))
agents = self._list_agents()
self._disable_agent(agents['agents'][0]['id'])
with self.network() as net2:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net2['network']['id'])
self.assertEqual(0, len(dhcp_agents['agents']))
def test_network_scheduler_with_down_agent(self):
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
self._register_one_agent_state(dhcp_hosta)
is_agent_down_str = 'quantum.db.agents_db.AgentDbMixin.is_agent_down'
with mock.patch(is_agent_down_str) as mock_is_agent_down:
mock_is_agent_down.return_value = False
with self.network() as net:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net['network']['id'])
self.assertEqual(1, len(dhcp_agents['agents']))
with mock.patch(is_agent_down_str) as mock_is_agent_down:
mock_is_agent_down.return_value = True
with self.network() as net:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net['network']['id'])
self.assertEqual(0, len(dhcp_agents['agents']))
def test_network_scheduler_with_hosted_network(self):
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
self._register_one_agent_state(dhcp_hosta)
agents = self._list_agents()
with self.network() as net1:
dhcp_agents = self._list_dhcp_agents_hosting_network(
net1['network']['id'])
self.assertEqual(1, len(dhcp_agents['agents']))
with mock.patch.object(OVSQuantumPluginV2,
'get_dhcp_agents_hosting_networks',
autospec=True) as mock_hosting_agents:
mock_hosting_agents.return_value = agents['agents']
with self.network(do_delete=False) as net2:
pass
dhcp_agents = self._list_dhcp_agents_hosting_network(
net2['network']['id'])
self.assertEqual(0, len(dhcp_agents['agents']))
def test_network_policy(self):
with self.network() as net1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
self._list_networks_hosted_by_dhcp_agent(
hosta_id, expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._add_network_to_dhcp_agent(
hosta_id, net1['network']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._add_network_to_dhcp_agent(hosta_id,
net1['network']['id'])
self._remove_network_from_dhcp_agent(
hosta_id, net1['network']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._list_dhcp_agents_hosting_network(
net1['network']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
def test_network_add_to_dhcp_agent(self):
with self.network() as net1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
num_before_add = len(
self._list_networks_hosted_by_dhcp_agent(
hosta_id)['networks'])
self._add_network_to_dhcp_agent(hosta_id,
net1['network']['id'])
num_after_add = len(
self._list_networks_hosted_by_dhcp_agent(
hosta_id)['networks'])
self.assertEqual(0, num_before_add)
self.assertEqual(1, num_after_add)
def test_network_remove_from_dhcp_agent(self):
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
self._register_one_agent_state(dhcp_hosta)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
DHCP_HOSTA)
with self.network() as net1:
num_before_remove = len(
self._list_networks_hosted_by_dhcp_agent(
hosta_id)['networks'])
self._remove_network_from_dhcp_agent(hosta_id,
net1['network']['id'])
num_after_remove = len(
self._list_networks_hosted_by_dhcp_agent(
hosta_id)['networks'])
self.assertEqual(1, num_before_remove)
self.assertEqual(0, num_after_remove)
def test_router_auto_schedule_with_hosted(self):
with self.router() as router:
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
self._register_agent_states()
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1, len(l3_agents['agents']))
self.assertEqual(L3_HOSTA, l3_agents['agents'][0]['host'])
def test_router_auto_schedule_with_hosted_2(self):
# one agent hosts one router
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_hosta = {
'binary': 'quantum-l3-agent',
'host': L3_HOSTA,
'topic': 'L3_AGENT',
'configurations': {'use_namespaces': True,
'router_id': None,
'handle_internal_only_routers':
True,
'gateway_external_network_id':
None,
'interface_driver': 'interface_driver',
},
'agent_type': constants.AGENT_TYPE_L3}
l3_hostb = copy.deepcopy(l3_hosta)
l3_hostb['host'] = L3_HOSTB
with self.router() as router1:
self._register_one_agent_state(l3_hosta)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
with self.router() as router2:
self._register_one_agent_state(l3_hostb)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
l3_agents_1 = self._list_l3_agents_hosting_router(
router1['router']['id'])
l3_agents_2 = self._list_l3_agents_hosting_router(
router2['router']['id'])
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
num_hosta_routers = len(hosta_routers['routers'])
hostb_id = self._get_agent_id(
constants.AGENT_TYPE_L3,
L3_HOSTB)
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
num_hostc_routers = len(hostb_routers['routers'])
self.assertEqual(1, num_hosta_routers)
self.assertEqual(1, num_hostc_routers)
self.assertEqual(1, len(l3_agents_1['agents']))
self.assertEqual(1, len(l3_agents_2['agents']))
self.assertEqual(L3_HOSTA, l3_agents_1['agents'][0]['host'])
self.assertEqual(L3_HOSTB, l3_agents_2['agents'][0]['host'])
def test_router_auto_schedule_with_disabled(self):
with contextlib.nested(self.router(),
self.router()):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTB)
self._disable_agent(hosta_id)
# first agent will not host router since it is disabled
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
# second agent will host all the routers since first is disabled.
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
num_hostb_routers = len(hostb_routers['routers'])
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
num_hosta_routers = len(hosta_routers['routers'])
self.assertEqual(2, num_hostb_routers)
self.assertEqual(0, num_hosta_routers)
def test_router_auto_schedule_with_candidates(self):
l3_hosta = {
'binary': 'quantum-l3-agent',
'host': L3_HOSTA,
'topic': 'L3_AGENT',
'configurations': {'use_namespaces': False,
'router_id': None,
'handle_internal_only_routers':
True,
'gateway_external_network_id':
None,
'interface_driver': 'interface_driver',
},
'agent_type': constants.AGENT_TYPE_L3}
with contextlib.nested(self.router(),
self.router()) as (router1, router2):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_hosta['configurations']['router_id'] = router1['router']['id']
self._register_one_agent_state(l3_hosta)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
num_hosta_routers = len(hosta_routers['routers'])
l3_agents_1 = self._list_l3_agents_hosting_router(
router1['router']['id'])
l3_agents_2 = self._list_l3_agents_hosting_router(
router2['router']['id'])
# L3 agent will host only the compatible router.
self.assertEqual(1, num_hosta_routers)
self.assertEqual(1, len(l3_agents_1['agents']))
self.assertEqual(0, len(l3_agents_2['agents']))
def test_router_schedule_with_candidates(self):
l3_hosta = {
'binary': 'quantum-l3-agent',
'host': L3_HOSTA,
'topic': 'L3_AGENT',
'configurations': {'use_namespaces': False,
'router_id': None,
'handle_internal_only_routers':
True,
'gateway_external_network_id':
None,
'interface_driver': 'interface_driver',
},
'agent_type': constants.AGENT_TYPE_L3}
with contextlib.nested(self.router(),
self.router(),
self.subnet(),
self.subnet(cidr='10.0.3.0/24')) as (router1,
router2,
subnet1,
subnet2):
l3_hosta['configurations']['router_id'] = router1['router']['id']
self._register_one_agent_state(l3_hosta)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._router_interface_action('add',
router1['router']['id'],
subnet1['subnet']['id'],
None)
self._router_interface_action('add',
router2['router']['id'],
subnet2['subnet']['id'],
None)
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
num_hosta_routers = len(hosta_routers['routers'])
l3_agents_1 = self._list_l3_agents_hosting_router(
router1['router']['id'])
l3_agents_2 = self._list_l3_agents_hosting_router(
router2['router']['id'])
# L3 agent will host only the compatible router.
self.assertEqual(1, num_hosta_routers)
self.assertEqual(1, len(l3_agents_1['agents']))
self.assertEqual(0, len(l3_agents_2['agents']))
def test_router_without_l3_agents(self):
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
'network_id': s['subnet']['network_id']}
router_req = self.new_create_request('routers', data, self.fmt)
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
l3agents = (
self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers(
self.adminContext, [router['router']['id']]))
self._delete('routers', router['router']['id'])
self.assertEqual(0, len(l3agents))
def test_router_sync_data(self):
with contextlib.nested(self.subnet(),
self.subnet(cidr='10.0.2.0/24'),
self.subnet(cidr='10.0.3.0/24')) as (
s1, s2, s3):
self._register_agent_states()
self._set_net_external(s1['subnet']['network_id'])
data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
'network_id': s1['subnet']['network_id']}
router_req = self.new_create_request('routers', data, self.fmt)
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
self._router_interface_action('add',
router['router']['id'],
s2['subnet']['id'],
None)
self._router_interface_action('add',
router['router']['id'],
s3['subnet']['id'],
None)
l3agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1, len(l3agents['agents']))
agents = self._list_agents()
another_l3_agent_id = None
another_l3_agent_host = None
default = l3agents['agents'][0]['id']
for com in agents['agents']:
if (com['id'] != default and
com['agent_type'] == constants.AGENT_TYPE_L3):
another_l3_agent_id = com['id']
another_l3_agent_host = com['host']
break
self.assertTrue(another_l3_agent_id is not None)
self._add_router_to_l3_agent(another_l3_agent_id,
router['router']['id'],
expected_code=exc.HTTPConflict.code)
self._remove_router_from_l3_agent(default,
router['router']['id'])
self._add_router_to_l3_agent(another_l3_agent_id,
router['router']['id'])
l3agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(another_l3_agent_host,
l3agents['agents'][0]['host'])
self._remove_router_from_l3_agent(another_l3_agent_id,
router['router']['id'])
self._router_interface_action('remove',
router['router']['id'],
s2['subnet']['id'],
None)
l3agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1,
len(l3agents['agents']))
self._router_interface_action('remove',
router['router']['id'],
s3['subnet']['id'],
None)
self._delete('routers', router['router']['id'])
def test_router_add_to_l3_agent(self):
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
num_before_add = len(
self._list_routers_hosted_by_l3_agent(
hosta_id)['routers'])
self._add_router_to_l3_agent(hosta_id,
router1['router']['id'])
hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTB)
self._add_router_to_l3_agent(hostb_id,
router1['router']['id'],
expected_code=exc.HTTPConflict.code)
num_after_add = len(
self._list_routers_hosted_by_l3_agent(
hosta_id)['routers'])
self.assertEqual(0, num_before_add)
self.assertEqual(1, num_after_add)
def test_router_policy(self):
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._list_routers_hosted_by_l3_agent(
hosta_id, expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._add_router_to_l3_agent(
hosta_id, router1['router']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._add_router_to_l3_agent(
hosta_id, router1['router']['id'])
self._remove_router_from_l3_agent(
hosta_id, router1['router']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._list_l3_agents_hosting_router(
router1['router']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
class L3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
test_agent_ext_plugin.AgentDBTestMixIn,
AgentSchedulerTestMixIn,
test_plugin.QuantumDbPluginV2TestCase):
def setUp(self):
plugin = ('quantum.plugins.openvswitch.'
'ovs_quantum_plugin.OVSQuantumPluginV2')
self.dhcp_notifier_cls_p = mock.patch(
'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
'DhcpAgentNotifyAPI')
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
super(L3AgentNotifierTestCase, self).setUp(plugin)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
def test_router_add_to_l3_agent_notification(self):
plugin = manager.QuantumManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._add_router_to_l3_agent(hosta_id,
router1['router']['id'])
routers = plugin.get_sync_data(self.adminContext,
[router1['router']['id']])
mock_l3.assert_called_with(
mock.ANY,
plugin.l3_agent_notifier.make_msg(
'router_added_to_agent',
payload=routers),
topic='l3_agent.hosta')
def test_router_remove_from_l3_agent_notification(self):
plugin = manager.QuantumManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._add_router_to_l3_agent(hosta_id,
router1['router']['id'])
self._remove_router_from_l3_agent(hosta_id,
router1['router']['id'])
mock_l3.assert_called_with(
mock.ANY, plugin.l3_agent_notifier.make_msg(
'router_removed_from_agent',
payload={'router_id': router1['router']['id']}),
topic='l3_agent.hosta')
def test_agent_updated_l3_agent_notification(self):
plugin = manager.QuantumManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
mock_l3.assert_called_with(
mock.ANY, plugin.l3_agent_notifier.make_msg(
'agent_updated',
payload={'admin_state_up': False}),
topic='l3_agent.hosta')
class AgentSchedulerTestCaseXML(AgentSchedulerTestCase):
fmt = 'xml'

View File

@ -62,28 +62,17 @@ class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2,
supported_extension_aliases = ["agent"] supported_extension_aliases = ["agent"]
class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): class AgentDBTestMixIn(object):
fmt = 'json'
def setUp(self):
self.adminContext = context.get_admin_context()
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = AgentTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(AgentDBTestCase, self).setUp()
def _list_agents(self, expected_res_status=None, def _list_agents(self, expected_res_status=None,
quantum_context=None, quantum_context=None,
query_string=None): query_string=None):
comp_res = self._list('agents', agent_res = self._list('agents',
quantum_context=quantum_context, quantum_context=quantum_context,
query_params=query_string) query_params=query_string)
if expected_res_status: if expected_res_status:
self.assertEqual(comp_res.status_int, expected_res_status) self.assertEqual(agent_res.status_int, expected_res_status)
return comp_res return agent_res
def _register_agent_states(self): def _register_agent_states(self):
"""Register two L3 agents and two DHCP agents.""" """Register two L3 agents and two DHCP agents."""
@ -123,6 +112,21 @@ class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
agent_state={'agent_state': dhcp_hostc}) agent_state={'agent_state': dhcp_hostc})
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
class AgentDBTestCase(AgentDBTestMixIn,
test_db_plugin.QuantumDbPluginV2TestCase):
fmt = 'json'
def setUp(self):
self.adminContext = context.get_admin_context()
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = AgentTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(AgentDBTestCase, self).setUp()
def test_create_agent(self): def test_create_agent(self):
data = {'agent': {}} data = {'agent': {}}
_req = self.new_create_request('agents', data, self.fmt) _req = self.new_create_request('agents', data, self.fmt)

View File

@ -25,7 +25,7 @@ class TestDhcpRpcCallackMixin(testtools.TestCase):
super(TestDhcpRpcCallackMixin, self).setUp() super(TestDhcpRpcCallackMixin, self).setUp()
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin') self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start() get_plugin = self.plugin_p.start()
self.plugin = mock.Mock() self.plugin = mock.MagicMock()
get_plugin.return_value = self.plugin get_plugin.return_value = self.plugin
self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin() self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin()
self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG') self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG')

View File

@ -88,7 +88,7 @@ class TestBasicRouterOperations(testtools.TestCase):
def testRouterInfoCreate(self): def testRouterInfoCreate(self):
id = _uuid() id = _uuid()
ri = l3_agent.RouterInfo(id, self.conf.root_helper, ri = l3_agent.RouterInfo(id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces, None)
self.assertTrue(ri.ns_name().endswith(id)) self.assertTrue(ri.ns_name().endswith(id))
@ -100,7 +100,7 @@ class TestBasicRouterOperations(testtools.TestCase):
router_id = _uuid() router_id = _uuid()
network_id = _uuid() network_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
cidr = '99.0.1.9/24' cidr = '99.0.1.9/24'
mac = 'ca:fe:de:ad:be:ef' mac = 'ca:fe:de:ad:be:ef'
@ -128,7 +128,7 @@ class TestBasicRouterOperations(testtools.TestCase):
def _test_external_gateway_action(self, action): def _test_external_gateway_action(self, action):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16'] internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
@ -172,7 +172,7 @@ class TestBasicRouterOperations(testtools.TestCase):
def _test_floating_ip_action(self, action): def _test_floating_ip_action(self, action):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
floating_ip = '20.0.0.100' floating_ip = '20.0.0.100'
fixed_ip = '10.0.0.23' fixed_ip = '10.0.0.23'
@ -227,7 +227,8 @@ class TestBasicRouterOperations(testtools.TestCase):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces,
None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
fake_route1 = {'destination': '135.207.0.0/16', fake_route1 = {'destination': '135.207.0.0/16',
@ -274,7 +275,8 @@ class TestBasicRouterOperations(testtools.TestCase):
router_id = _uuid() router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces) self.conf.use_namespaces,
None)
ri.router = {} ri.router = {}
fake_old_routes = [] fake_old_routes = []

View File

@ -29,6 +29,7 @@ from webob import exc
import webtest import webtest
from quantum.api import extensions from quantum.api import extensions
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.common import config from quantum.common import config
from quantum.common import constants as l3_constants from quantum.common import constants as l3_constants
@ -37,7 +38,6 @@ from quantum.common.test_lib import test_config
from quantum import context from quantum import context
from quantum.db import db_base_plugin_v2 from quantum.db import db_base_plugin_v2
from quantum.db import l3_db from quantum.db import l3_db
from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.extensions import l3 from quantum.extensions import l3
from quantum.manager import QuantumManager from quantum.manager import QuantumManager
@ -307,24 +307,7 @@ class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2,
return super(TestL3NatPlugin, self).delete_port(context, id) return super(TestL3NatPlugin, self).delete_port(context, id)
class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): class L3NatTestCaseMixin(object):
def setUp(self):
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = L3TestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(L3NatTestCaseBase, self).setUp()
# Set to None to reload the drivers
notifier_api._drivers = None
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
def tearDown(self):
test_notifier.NOTIFICATIONS = []
super(L3NatTestCaseBase, self).tearDown()
def _create_network(self, fmt, name, admin_state_up, **kwargs): def _create_network(self, fmt, name, admin_state_up, **kwargs):
""" Override the routine for allowing the router:external attribute """ """ Override the routine for allowing the router:external attribute """
@ -334,7 +317,7 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
kwargs), kwargs),
kwargs.values())) kwargs.values()))
arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,) arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,)
return super(L3NatTestCaseBase, self)._create_network( return super(L3NatTestCaseMixin, self)._create_network(
fmt, name, admin_state_up, arg_list=arg_list, **new_args) fmt, name, admin_state_up, arg_list=arg_list, **new_args)
def _create_router(self, fmt, tenant_id, name=None, def _create_router(self, fmt, tenant_id, name=None,
@ -505,6 +488,27 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
public_sub['subnet']['network_id']) public_sub['subnet']['network_id'])
class L3NatTestCaseBase(L3NatTestCaseMixin,
test_db_plugin.QuantumDbPluginV2TestCase):
def setUp(self):
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = L3TestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(L3NatTestCaseBase, self).setUp()
# Set to None to reload the drivers
notifier_api._drivers = None
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
def tearDown(self):
test_notifier.NOTIFICATIONS = []
super(L3NatTestCaseBase, self).tearDown()
class L3NatDBTestCase(L3NatTestCaseBase): class L3NatDBTestCase(L3NatTestCaseBase):
def test_router_create(self): def test_router_create(self):
@ -1459,7 +1463,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
def _test_notify_op_agent(self, target_func, *args): def _test_notify_op_agent(self, target_func, *args):
l3_rpc_agent_api_str = ( l3_rpc_agent_api_str = (
'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI') 'quantum.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
oldNotify = l3_rpc_agent_api.L3AgentNotify oldNotify = l3_rpc_agent_api.L3AgentNotify
try: try:
with mock.patch(l3_rpc_agent_api_str) as notifyApi: with mock.patch(l3_rpc_agent_api_str) as notifyApi:

View File

@ -19,8 +19,12 @@ sitepackages = True
downloadcache = ~/cache/pip downloadcache = ~/cache/pip
[testenv:pep8] [testenv:pep8]
# E712 comparison to False should be 'if cond is False:' or 'if not cond:'
# query = query.filter(Component.disabled == False)
# E125 continuation line does not distinguish itself from next logical line
commands = commands =
pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,openstack,*egg . pep8 --repeat --show-source --ignore=E125,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg .
pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin
[testenv:i18n] [testenv:i18n]