Merge "Add scheduling feature basing on agent management extension"
This commit is contained in:
commit
4ec44007e2
@ -58,5 +58,14 @@
|
||||
"update_agent": "rule:admin_only",
|
||||
"delete_agent": "rule:admin_only",
|
||||
"get_agent": "rule:admin_only",
|
||||
"get_agents": "rule:admin_only"
|
||||
"get_agents": "rule:admin_only",
|
||||
|
||||
"create_dhcp-network": "rule:admin_only",
|
||||
"delete_dhcp-network": "rule:admin_only",
|
||||
"get_dhcp-networks": "rule:admin_only",
|
||||
"create_l3-router": "rule:admin_only",
|
||||
"delete_l3-router": "rule:admin_only",
|
||||
"get_l3-routers": "rule:admin_only",
|
||||
"get_dhcp-agents": "rule:admin_only",
|
||||
"get_l3-agents": "rule:admin_only"
|
||||
}
|
||||
|
@ -198,6 +198,27 @@ notification_topics = notifications
|
||||
# Maximum number of fixed ips per port
|
||||
# max_fixed_ips_per_port = 5
|
||||
|
||||
# =========== items for agent management extension =============
|
||||
# Seconds to regard the agent as down.
|
||||
# agent_down_time = 5
|
||||
# =========== end of items for agent management extension =====
|
||||
|
||||
# =========== items for agent scheduler extension =============
|
||||
# Driver to use for scheduling network to DHCP agent
|
||||
# network_scheduler_driver = quantum.scheduler.dhcp_agent_scheduler.ChanceScheduler
|
||||
# Driver to use for scheduling router to a default L3 agent
|
||||
# router_scheduler_driver = quantum.scheduler.l3_agent_scheduler.ChanceScheduler
|
||||
|
||||
# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
|
||||
# networks to first DHCP agent which sends get_active_networks message to
|
||||
# quantum server
|
||||
# network_auto_schedule = True
|
||||
|
||||
# Allow auto scheduling routers to L3 agent. It will schedule non-hosted
|
||||
# routers to first L3 agent which sends sync_routers message to quantum server
|
||||
# router_auto_schedule = True
|
||||
# =========== end of items for agent scheduler extension =====
|
||||
|
||||
[QUOTAS]
|
||||
# resource name(s) that are supported in quota features
|
||||
# quota_items = network,subnet,port
|
||||
@ -217,11 +238,6 @@ notification_topics = notifications
|
||||
# default driver to use for quota checks
|
||||
# quota_driver = quantum.quota.ConfDriver
|
||||
|
||||
# =========== items for agent management extension =============
|
||||
# Seconds to regard the agent as down.
|
||||
# agent_down_time = 5
|
||||
# =========== end of items for agent management extension =====
|
||||
|
||||
[DEFAULT_SERVICETYPE]
|
||||
# Description of the default service type (optional)
|
||||
# description = "default service type"
|
||||
|
@ -320,7 +320,7 @@ class DhcpPluginApi(proxy.RpcProxy):
|
||||
super(DhcpPluginApi, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
self.context = context
|
||||
self.host = socket.gethostname()
|
||||
self.host = cfg.CONF.host
|
||||
|
||||
def get_active_networks(self):
|
||||
"""Make a remote process call to retrieve the active networks."""
|
||||
@ -685,6 +685,11 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
||||
if self.agent_state.pop('start_flag', None):
|
||||
self.run()
|
||||
|
||||
def agent_updated(self, context, payload):
|
||||
"""Handle the agent_updated notification event."""
|
||||
self.needs_resync = True
|
||||
LOG.info(_("agent_updated by server side %s!"), payload)
|
||||
|
||||
def after_start(self):
|
||||
LOG.info(_("DHCP agent started"))
|
||||
|
||||
|
@ -93,7 +93,7 @@ class L3PluginApi(proxy.RpcProxy):
|
||||
|
||||
class RouterInfo(object):
|
||||
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router=None):
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router):
|
||||
self.router_id = router_id
|
||||
self.ex_gw_port = None
|
||||
self.internal_ports = []
|
||||
@ -227,7 +227,7 @@ class L3NATAgent(manager.Manager):
|
||||
else:
|
||||
raise
|
||||
|
||||
def _router_added(self, router_id, router=None):
|
||||
def _router_added(self, router_id, router):
|
||||
ri = RouterInfo(router_id, self.root_helper,
|
||||
self.conf.use_namespaces, router)
|
||||
self.router_info[router_id] = ri
|
||||
@ -242,6 +242,10 @@ class L3NATAgent(manager.Manager):
|
||||
|
||||
def _router_removed(self, router_id):
|
||||
ri = self.router_info[router_id]
|
||||
ri.router['gw_port'] = None
|
||||
ri.router[l3_constants.INTERFACE_KEY] = []
|
||||
ri.router[l3_constants.FLOATINGIP_KEY] = []
|
||||
self.process_router(ri)
|
||||
for c, r in self.metadata_filter_rules():
|
||||
ri.iptables_manager.ipv4['filter'].remove_rule(c, r)
|
||||
for c, r in self.metadata_nat_rules():
|
||||
@ -568,7 +572,13 @@ class L3NATAgent(manager.Manager):
|
||||
LOG.debug(msg)
|
||||
self.fullsync = True
|
||||
|
||||
def _process_routers(self, routers):
|
||||
def router_removed_from_agent(self, context, payload):
|
||||
self.router_deleted(context, payload['router_id'])
|
||||
|
||||
def router_added_to_agent(self, context, payload):
|
||||
self.routers_updated(context, payload)
|
||||
|
||||
def _process_routers(self, routers, all_routers=False):
|
||||
if (self.conf.external_network_bridge and
|
||||
not ip_lib.device_exists(self.conf.external_network_bridge)):
|
||||
LOG.error(_("The external network bridge '%s' does not exist"),
|
||||
@ -576,7 +586,17 @@ class L3NATAgent(manager.Manager):
|
||||
return
|
||||
|
||||
target_ex_net_id = self._fetch_external_net_id()
|
||||
|
||||
# if routers are all the routers we have (They are from router sync on
|
||||
# starting or when error occurs during running), we seek the
|
||||
# routers which should be removed.
|
||||
# If routers are from server side notification, we seek them
|
||||
# from subset of incoming routers and ones we have now.
|
||||
if all_routers:
|
||||
prev_router_ids = set(self.router_info)
|
||||
else:
|
||||
prev_router_ids = set(self.router_info) & set(
|
||||
[router['id'] for router in routers])
|
||||
cur_router_ids = set()
|
||||
for r in routers:
|
||||
if not r['admin_state_up']:
|
||||
continue
|
||||
@ -593,13 +613,15 @@ class L3NATAgent(manager.Manager):
|
||||
|
||||
if ex_net_id and ex_net_id != target_ex_net_id:
|
||||
continue
|
||||
|
||||
cur_router_ids.add(r['id'])
|
||||
if r['id'] not in self.router_info:
|
||||
self._router_added(r['id'])
|
||||
|
||||
self._router_added(r['id'], r)
|
||||
ri = self.router_info[r['id']]
|
||||
ri.router = r
|
||||
self.process_router(ri)
|
||||
# identify and remove routers that no longer exist
|
||||
for router_id in prev_router_ids - cur_router_ids:
|
||||
self._router_removed(router_id)
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _sync_routers_task(self, context):
|
||||
@ -613,8 +635,7 @@ class L3NATAgent(manager.Manager):
|
||||
router_id = None
|
||||
routers = self.plugin_rpc.get_routers(
|
||||
context, router_id)
|
||||
self.router_info = {}
|
||||
self._process_routers(routers)
|
||||
self._process_routers(routers, all_routers=True)
|
||||
self.fullsync = False
|
||||
except Exception:
|
||||
LOG.exception(_("Failed synchronizing routers"))
|
||||
@ -704,6 +725,11 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
||||
except Exception:
|
||||
LOG.exception(_("Failed reporting state!"))
|
||||
|
||||
def agent_updated(self, context, payload):
|
||||
"""Handle the agent_updated notification event."""
|
||||
self.fullsync = True
|
||||
LOG.info(_("agent_updated by server side %s!"), payload)
|
||||
|
||||
|
||||
def main():
|
||||
eventlet.monkey_patch()
|
||||
|
@ -13,7 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.common import topics
|
||||
from quantum.common import utils
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
@ -40,11 +43,35 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
super(DhcpAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification(self, context, method, payload):
|
||||
def _get_dhcp_agents(self, context, network_id):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id], active=True)
|
||||
return [(dhcp_agent.host, dhcp_agent.topic) for
|
||||
dhcp_agent in dhcp_agents]
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent on host"""
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.DHCP_AGENT, host))
|
||||
|
||||
def _notification(self, context, method, payload, network_id):
|
||||
"""Notify all the agents that are hosting the network"""
|
||||
# By now, we have no scheduling feature, so we fanout
|
||||
# to all of the DHCP agents
|
||||
self._notification_fanout(context, method, payload)
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
if (method != 'network_delete_end' and utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)):
|
||||
for (host, topic) in self._get_dhcp_agents(context, network_id):
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topic, host))
|
||||
else:
|
||||
# besides the non-agentscheduler plugin,
|
||||
# There is no way to query who is hosting the network
|
||||
# when the network is deleted, so we need to fanout
|
||||
self._notification_fanout(context, method, payload)
|
||||
|
||||
def _notification_fanout(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents"""
|
||||
@ -53,6 +80,19 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
payload=payload),
|
||||
topic=topics.DHCP_AGENT)
|
||||
|
||||
def network_removed_from_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_delete_end',
|
||||
{'network_id': network_id}, host)
|
||||
|
||||
def network_added_to_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_create_end',
|
||||
{'network': {'id': network_id}}, host)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
|
||||
def notify(self, context, data, methodname):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
if methodname not in self.VALID_METHOD_NAMES:
|
||||
@ -61,10 +101,18 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
if obj_type not in self.VALID_RESOURCES:
|
||||
return
|
||||
obj_value = data[obj_type]
|
||||
network_id = None
|
||||
if obj_type == 'network' and 'id' in obj_value:
|
||||
network_id = obj_value['id']
|
||||
elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
|
||||
network_id = obj_value['network_id']
|
||||
if not network_id:
|
||||
return
|
||||
methodname = methodname.replace(".", "_")
|
||||
if methodname.endswith("_delete_end"):
|
||||
if 'id' in obj_value:
|
||||
self._notification(context, methodname,
|
||||
{obj_type + '_id': obj_value['id']})
|
||||
{obj_type + '_id': obj_value['id']},
|
||||
network_id)
|
||||
else:
|
||||
self._notification(context, methodname, data)
|
||||
self._notification(context, methodname, data, network_id)
|
||||
|
120
quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py
Normal file
120
quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py
Normal file
@ -0,0 +1,120 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.common import topics
|
||||
from quantum.common import utils
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L3AgentNotifyAPI(proxy.RpcProxy):
|
||||
"""API for plugin to notify L3 agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=topics.L3_AGENT):
|
||||
super(L3AgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent that is hosting the router"""
|
||||
LOG.debug(_('Nofity agent at %(host)s the message '
|
||||
'%(method)s'), {'host': host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.L3_AGENT, host))
|
||||
|
||||
def _agent_notification(self, context, method, routers,
|
||||
operation, data):
|
||||
"""Notify changed routers to hosting l3 agents.
|
||||
|
||||
Adjust routers according to l3 agents' role and
|
||||
related dhcp agents.
|
||||
Notify dhcp agent to get right subnet's gateway ips.
|
||||
"""
|
||||
adminContext = context.is_admin and context or context.elevated()
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
adminContext, [router['id']],
|
||||
admin_state_up=True,
|
||||
active=True)
|
||||
for l3_agent in l3_agents:
|
||||
LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
|
||||
'%(method)s'),
|
||||
{'topic': l3_agent.topic,
|
||||
'host': l3_agent.host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
routers=[router]),
|
||||
topic='%s.%s' % (l3_agent.topic, l3_agent.host))
|
||||
|
||||
def _notification(self, context, method, routers, operation, data):
|
||||
"""Notify all the agents that are hosting the routers"""
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
|
||||
adminContext = (context.is_admin and
|
||||
context or context.elevated())
|
||||
plugin.schedule_routers(adminContext, routers)
|
||||
self._agent_notification(
|
||||
context, method, routers, operation, data)
|
||||
else:
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
routers=routers),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def _notification_fanout(self, context, method, router_id):
|
||||
"""Fanout the deleted router to all L3 agents"""
|
||||
LOG.debug(_('Fanout notify agent at %(topic)s the message '
|
||||
'%(method)s on router %(router_id)s'),
|
||||
{'topic': topics.DHCP_AGENT,
|
||||
'method': method,
|
||||
'router_id': router_id})
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
router_id=router_id),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
|
||||
def router_deleted(self, context, router_id):
|
||||
self._notification_fanout(context, 'router_deleted', router_id)
|
||||
|
||||
def routers_updated(self, context, routers, operation=None, data=None):
|
||||
if routers:
|
||||
self._notification(context, 'routers_updated', routers,
|
||||
operation, data)
|
||||
|
||||
def router_removed_from_agent(self, context, router_id, host):
|
||||
self._notification_host(context, 'router_removed_from_agent',
|
||||
{'router_id': router_id}, host)
|
||||
|
||||
def router_added_to_agent(self, context, routers, host):
|
||||
self._notification_host(context, 'router_added_to_agent',
|
||||
routers, host)
|
||||
|
||||
L3AgentNotify = L3AgentNotifyAPI()
|
@ -66,3 +66,5 @@ PAGINATION_INFINITE = 'infinite'
|
||||
|
||||
SORT_DIRECTION_ASC = 'asc'
|
||||
SORT_DIRECTION_DESC = 'desc'
|
||||
|
||||
AGENT_SCHEDULER_EXT_ALIAS = 'agent_scheduler'
|
||||
|
@ -183,3 +183,8 @@ def diff_list_of_dict(old_list, new_list):
|
||||
added = new_set - old_set
|
||||
removed = old_set - new_set
|
||||
return [str2dict(a) for a in added], [str2dict(r) for r in removed]
|
||||
|
||||
|
||||
def is_extension_supported(plugin, ext_alias):
|
||||
return ext_alias in getattr(
|
||||
plugin, "supported_extension_aliases", [])
|
||||
|
@ -67,24 +67,30 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
raise ext_agent.AgentNotFound(id=id)
|
||||
return agent
|
||||
|
||||
def _is_agent_down(self, heart_beat_time_str):
|
||||
return timeutils.is_older_than(heart_beat_time_str,
|
||||
@classmethod
|
||||
def is_agent_down(cls, heart_beat_time):
|
||||
return timeutils.is_older_than(heart_beat_time,
|
||||
cfg.CONF.agent_down_time)
|
||||
|
||||
def get_configuration_dict(self, agent_db):
|
||||
try:
|
||||
conf = jsonutils.loads(agent_db.configurations)
|
||||
except Exception:
|
||||
msg = _('Configuration for agent %(agent_type)s on host %(host)s'
|
||||
' is invalid.')
|
||||
LOG.warn(msg, {'agent_type': agent_db.agent_type,
|
||||
'host': agent_db.host})
|
||||
conf = {}
|
||||
return conf
|
||||
|
||||
def _make_agent_dict(self, agent, fields=None):
|
||||
attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
|
||||
ext_agent.RESOURCE_NAME + 's')
|
||||
res = dict((k, agent[k]) for k in attr
|
||||
if k not in ['alive', 'configurations'])
|
||||
res['alive'] = not self._is_agent_down(res['heartbeat_timestamp'])
|
||||
try:
|
||||
res['configurations'] = jsonutils.loads(agent['configurations'])
|
||||
except Exception:
|
||||
msg = _('Configurations for agent %(agent_type)s on host %(host)s'
|
||||
' are invalid.')
|
||||
LOG.warn(msg, {'agent_type': res['agent_type'],
|
||||
'host': res['host']})
|
||||
res['configurations'] = {}
|
||||
res['alive'] = not AgentDbMixin.is_agent_down(
|
||||
res['heartbeat_timestamp'])
|
||||
res['configurations'] = self.get_configuration_dict(agent)
|
||||
return self._fields(res, fields)
|
||||
|
||||
def delete_agent(self, context, id):
|
||||
@ -99,6 +105,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
agent.update(agent_data)
|
||||
return self._make_agent_dict(agent)
|
||||
|
||||
def get_agents_db(self, context, filters=None):
|
||||
query = self._get_collection_query(context, Agent, filters=filters)
|
||||
return query.all()
|
||||
|
||||
def get_agents(self, context, filters=None, fields=None):
|
||||
return self._get_collection(context, Agent,
|
||||
self._make_agent_dict,
|
||||
|
363
quantum/db/agentschedulers_db.py
Normal file
363
quantum/db/agentschedulers_db.py
Normal file
@ -0,0 +1,363 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import constants
|
||||
from quantum.db import agents_db
|
||||
from quantum.db import model_base
|
||||
from quantum.db import models_v2
|
||||
from quantum.extensions import agentscheduler
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import uuidutils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NetworkDhcpAgentBinding(model_base.BASEV2):
|
||||
"""Represents binding between quantum networks and DHCP agents"""
|
||||
network_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("networks.id", ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
dhcp_agent = orm.relation(agents_db.Agent)
|
||||
dhcp_agent_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("agents.id",
|
||||
ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
|
||||
|
||||
class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
|
||||
"""Represents binding between quantum routers and L3 agents"""
|
||||
router_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("routers.id", ondelete='CASCADE'))
|
||||
l3_agent = orm.relation(agents_db.Agent)
|
||||
l3_agent_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("agents.id",
|
||||
ondelete='CASCADE'))
|
||||
|
||||
|
||||
class AgentSchedulerDbMixin(agentscheduler.AgentSchedulerPluginBase):
|
||||
"""Mixin class to add agent scheduler extension to db_plugin_base_v2."""
|
||||
|
||||
dhcp_agent_notifier = None
|
||||
l3_agent_notifier = None
|
||||
network_scheduler = None
|
||||
router_scheduler = None
|
||||
|
||||
def get_dhcp_agents_hosting_networks(
|
||||
self, context, network_ids, active=None):
|
||||
if not network_ids:
|
||||
return []
|
||||
query = context.session.query(NetworkDhcpAgentBinding)
|
||||
query = query.options(joinedload('dhcp_agent'))
|
||||
if len(network_ids) == 1:
|
||||
query = query.filter(
|
||||
NetworkDhcpAgentBinding.network_id == network_ids[0])
|
||||
elif network_ids:
|
||||
query = query.filter(
|
||||
NetworkDhcpAgentBinding.network_id in network_ids)
|
||||
if active is not None:
|
||||
query = (query.filter(agents_db.Agent.admin_state_up == active))
|
||||
dhcp_agents = [binding.dhcp_agent for binding in query.all()]
|
||||
if active is not None:
|
||||
dhcp_agents = [dhcp_agent for dhcp_agent in
|
||||
dhcp_agents if not
|
||||
agents_db.AgentDbMixin.is_agent_down(
|
||||
dhcp_agent['heartbeat_timestamp'])]
|
||||
return dhcp_agents
|
||||
|
||||
def add_network_to_dhcp_agent(self, context, id, network_id):
|
||||
self._get_network(context, network_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
agent_db = self._get_agent(context, id)
|
||||
if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or
|
||||
not agent_db['admin_state_up']):
|
||||
raise agentscheduler.InvalidDHCPAgent(id=id)
|
||||
dhcp_agents = self.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id])
|
||||
for dhcp_agent in dhcp_agents:
|
||||
if id == dhcp_agent.id:
|
||||
raise agentscheduler.NetworkHostedByDHCPAgent(
|
||||
network_id=network_id, agent_id=id)
|
||||
binding = NetworkDhcpAgentBinding()
|
||||
binding.dhcp_agent_id = id
|
||||
binding.network_id = network_id
|
||||
context.session.add(binding)
|
||||
if self.dhcp_agent_notifier:
|
||||
self.dhcp_agent_notifier.network_added_to_agent(
|
||||
context, network_id, agent_db.host)
|
||||
|
||||
def remove_network_from_dhcp_agent(self, context, id, network_id):
|
||||
agent = self._get_agent(context, id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
try:
|
||||
query = context.session.query(NetworkDhcpAgentBinding)
|
||||
binding = query.filter(
|
||||
NetworkDhcpAgentBinding.network_id == network_id,
|
||||
NetworkDhcpAgentBinding.dhcp_agent_id == id).one()
|
||||
except exc.NoResultFound:
|
||||
raise agentscheduler.NetworkNotHostedByDhcpAgent(
|
||||
network_id=network_id, agent_id=id)
|
||||
context.session.delete(binding)
|
||||
if self.dhcp_agent_notifier:
|
||||
self.dhcp_agent_notifier.network_removed_from_agent(
|
||||
context, network_id, agent.host)
|
||||
|
||||
def list_networks_on_dhcp_agent(self, context, id):
|
||||
query = context.session.query(NetworkDhcpAgentBinding.network_id)
|
||||
net_ids = query.filter(
|
||||
NetworkDhcpAgentBinding.dhcp_agent_id == id).all()
|
||||
if net_ids:
|
||||
_ids = [item[0] for item in net_ids]
|
||||
return {'networks':
|
||||
self.get_networks(context, filters={'id': _ids})}
|
||||
else:
|
||||
return {'networks': []}
|
||||
|
||||
def list_active_networks_on_active_dhcp_agent(self, context, host):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_DHCP, host)
|
||||
if not agent.admin_state_up:
|
||||
return []
|
||||
query = context.session.query(NetworkDhcpAgentBinding.network_id)
|
||||
net_ids = query.filter(
|
||||
NetworkDhcpAgentBinding.dhcp_agent_id == agent.id).all()
|
||||
if net_ids:
|
||||
_ids = [item[0] for item in net_ids]
|
||||
return self.get_networks(
|
||||
context, filters={'id': _ids, 'admin_state_up': [True]})
|
||||
else:
|
||||
return []
|
||||
|
||||
def list_dhcp_agents_hosting_network(self, context, network_id):
|
||||
dhcp_agents = self.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id])
|
||||
agent_ids = [dhcp_agent.id for dhcp_agent in dhcp_agents]
|
||||
if agent_ids:
|
||||
return {
|
||||
'agents':
|
||||
self.get_agents(context, filters={'id': agent_ids})}
|
||||
else:
|
||||
return {'agents': []}
|
||||
|
||||
def add_router_to_l3_agent(self, context, id, router_id):
|
||||
"""Add a l3 agent to host a router.
|
||||
"""
|
||||
router = self.get_router(context, router_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
agent_db = self._get_agent(context, id)
|
||||
if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
|
||||
not agent_db['admin_state_up'] or
|
||||
not self.get_l3_agent_candidates(router, [agent_db])):
|
||||
raise agentscheduler.InvalidL3Agent(id=id)
|
||||
query = context.session.query(RouterL3AgentBinding)
|
||||
try:
|
||||
binding = query.filter(
|
||||
RouterL3AgentBinding.l3_agent_id == agent_db.id,
|
||||
RouterL3AgentBinding.router_id == router_id).one()
|
||||
if binding:
|
||||
raise agentscheduler.RouterHostedByL3Agent(
|
||||
router_id=router_id, agent_id=id)
|
||||
except exc.NoResultFound:
|
||||
pass
|
||||
|
||||
result = self.auto_schedule_routers(context,
|
||||
agent_db.host,
|
||||
router_id)
|
||||
if not result:
|
||||
raise agentscheduler.RouterSchedulingFailed(
|
||||
router_id=router_id, agent_id=id)
|
||||
|
||||
if self.l3_agent_notifier:
|
||||
routers = self.get_sync_data(context, [router_id])
|
||||
self.l3_agent_notifier.router_added_to_agent(
|
||||
context, routers, agent_db.host)
|
||||
|
||||
def remove_router_from_l3_agent(self, context, id, router_id):
|
||||
"""Remove the router from l3 agent. After it, the router
|
||||
will be non-hosted until there is update which
|
||||
lead to re schedule or be added to another agent manually."""
|
||||
agent = self._get_agent(context, id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
query = context.session.query(RouterL3AgentBinding)
|
||||
query = query.filter(
|
||||
RouterL3AgentBinding.router_id == router_id,
|
||||
RouterL3AgentBinding.l3_agent_id == id)
|
||||
try:
|
||||
binding = query.one()
|
||||
except exc.NoResultFound:
|
||||
raise agentscheduler.RouterNotHostedByL3Agent(
|
||||
router_id=router_id, agent_id=id)
|
||||
context.session.delete(binding)
|
||||
if self.l3_agent_notifier:
|
||||
self.l3_agent_notifier.router_removed_from_agent(
|
||||
context, router_id, agent.host)
|
||||
|
||||
def list_routers_on_l3_agent(self, context, id):
|
||||
query = context.session.query(RouterL3AgentBinding.router_id)
|
||||
router_ids = query.filter(
|
||||
RouterL3AgentBinding.l3_agent_id == id).all()
|
||||
if router_ids:
|
||||
_ids = [item[0] for item in router_ids]
|
||||
return {'routers':
|
||||
self.get_routers(context, filters={'id': _ids})}
|
||||
else:
|
||||
return {'routers': []}
|
||||
|
||||
def list_active_sync_routers_on_active_l3_agent(
|
||||
self, context, host, router_id):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
if not agent.admin_state_up:
|
||||
return []
|
||||
query = context.session.query(RouterL3AgentBinding.router_id)
|
||||
query = query.filter(
|
||||
RouterL3AgentBinding.l3_agent_id == agent.id)
|
||||
if router_id:
|
||||
query = query.filter(RouterL3AgentBinding.router_id == router_id)
|
||||
router_ids = query.all()
|
||||
if router_ids:
|
||||
_ids = [item[0] for item in router_ids]
|
||||
routers = self.get_sync_data(context, router_ids=_ids,
|
||||
active=True)
|
||||
return routers
|
||||
return []
|
||||
|
||||
def get_l3_agents_hosting_routers(self, context, router_ids,
|
||||
admin_state_up=None,
|
||||
active=None):
|
||||
if not router_ids:
|
||||
return []
|
||||
query = context.session.query(RouterL3AgentBinding)
|
||||
if len(router_ids) > 1:
|
||||
query = query.options(joinedload('l3_agent')).filter(
|
||||
RouterL3AgentBinding.router_id.in_(router_ids))
|
||||
else:
|
||||
query = query.options(joinedload('l3_agent')).filter(
|
||||
RouterL3AgentBinding.router_id == router_ids[0])
|
||||
if admin_state_up is not None:
|
||||
query = (query.filter(agents_db.Agent.admin_state_up ==
|
||||
admin_state_up))
|
||||
l3_agents = [binding.l3_agent for binding in query.all()]
|
||||
if active is not None:
|
||||
l3_agents = [l3_agent for l3_agent in
|
||||
l3_agents if not
|
||||
agents_db.AgentDbMixin.is_agent_down(
|
||||
l3_agent['heartbeat_timestamp'])]
|
||||
return l3_agents
|
||||
|
||||
def _get_l3_bindings_hosting_routers(self, context, router_ids):
|
||||
if not router_ids:
|
||||
return []
|
||||
query = context.session.query(RouterL3AgentBinding)
|
||||
if len(router_ids) > 1:
|
||||
query = query.options(joinedload('l3_agent')).filter(
|
||||
RouterL3AgentBinding.router_id.in_(router_ids))
|
||||
else:
|
||||
query = query.options(joinedload('l3_agent')).filter(
|
||||
RouterL3AgentBinding.router_id == router_ids[0])
|
||||
return query.all()
|
||||
|
||||
def list_l3_agents_hosting_router(self, context, router_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
bindings = self._get_l3_bindings_hosting_routers(
|
||||
context, [router_id])
|
||||
results = []
|
||||
for binding in bindings:
|
||||
l3_agent_dict = self._make_agent_dict(binding.l3_agent)
|
||||
results.append(l3_agent_dict)
|
||||
if results:
|
||||
return {'agents': results}
|
||||
else:
|
||||
return {'agents': []}
|
||||
|
||||
def schedule_network(self, context, request_network, created_network):
|
||||
if self.network_scheduler:
|
||||
result = self.network_scheduler.schedule(
|
||||
self, context, request_network, created_network)
|
||||
if not result:
|
||||
LOG.warn(_('Fail scheduling network %s'), created_network)
|
||||
|
||||
def auto_schedule_networks(self, context, host):
|
||||
if self.network_scheduler:
|
||||
self.network_scheduler.auto_schedule_networks(self, context, host)
|
||||
|
||||
def get_l3_agents(self, context, active=None, filters=None):
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(
|
||||
agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
|
||||
if active is not None:
|
||||
query = (query.filter(agents_db.Agent.admin_state_up == active))
|
||||
if filters:
|
||||
for key, value in filters.iteritems():
|
||||
column = getattr(agents_db.Agent, key, None)
|
||||
if column:
|
||||
query = query.filter(column.in_(value))
|
||||
l3_agents = query.all()
|
||||
if active is not None:
|
||||
l3_agents = [l3_agent for l3_agent in
|
||||
l3_agents if not
|
||||
agents_db.AgentDbMixin.is_agent_down(
|
||||
l3_agent['heartbeat_timestamp'])]
|
||||
return l3_agents
|
||||
|
||||
def get_l3_agent_candidates(self, sync_router, l3_agents):
|
||||
"""Get the valid l3 agents for the router from a list of l3_agents"""
|
||||
candidates = []
|
||||
for l3_agent in l3_agents:
|
||||
if not l3_agent.admin_state_up:
|
||||
continue
|
||||
agent_conf = self.get_configuration_dict(l3_agent)
|
||||
router_id = agent_conf.get('router_id', None)
|
||||
use_namespaces = agent_conf.get('use_namespaces', True)
|
||||
handle_internal_only_routers = agent_conf.get(
|
||||
'handle_internal_only_routers', True)
|
||||
gateway_external_network_id = agent_conf.get(
|
||||
'gateway_external_network_id', None)
|
||||
if not use_namespaces and router_id != sync_router['id']:
|
||||
continue
|
||||
ex_net_id = (sync_router['external_gateway_info'] or {}).get(
|
||||
'network_id')
|
||||
if ((not ex_net_id and not handle_internal_only_routers) or
|
||||
(ex_net_id and gateway_external_network_id and
|
||||
ex_net_id != gateway_external_network_id)):
|
||||
continue
|
||||
candidates.append(l3_agent)
|
||||
return candidates
|
||||
|
||||
def auto_schedule_routers(self, context, host, router_id):
|
||||
if self.router_scheduler:
|
||||
return self.router_scheduler.auto_schedule_routers(
|
||||
self, context, host, router_id)
|
||||
|
||||
def schedule_router(self, context, router):
|
||||
if self.router_scheduler:
|
||||
return self.router_scheduler.schedule(
|
||||
self, context, router)
|
||||
|
||||
def schedule_routers(self, context, routers):
|
||||
"""Schedule the routers to l3 agents.
|
||||
"""
|
||||
for router in routers:
|
||||
self.schedule_router(context, router)
|
@ -13,9 +13,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import constants
|
||||
from quantum.common import utils
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import log as logging
|
||||
|
||||
@ -31,14 +34,24 @@ class DhcpRpcCallbackMixin(object):
|
||||
host = kwargs.get('host')
|
||||
LOG.debug(_('Network list requested from %s'), host)
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
filters = dict(admin_state_up=[True])
|
||||
|
||||
return [net['id'] for net in
|
||||
plugin.get_networks(context, filters=filters)]
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
|
||||
if cfg.CONF.network_auto_schedule:
|
||||
plugin.auto_schedule_networks(context, host)
|
||||
nets = plugin.list_active_networks_on_active_dhcp_agent(
|
||||
context, host)
|
||||
else:
|
||||
filters = dict(admin_state_up=[True])
|
||||
nets = plugin.get_networks(context, filters=filters)
|
||||
return [net['id'] for net in nets]
|
||||
|
||||
def get_network_info(self, context, **kwargs):
|
||||
"""Retrieve and return a extended information about a network."""
|
||||
network_id = kwargs.get('network_id')
|
||||
host = kwargs.get('host')
|
||||
LOG.debug(_('Network %(network_id)s requested from '
|
||||
'%(host)s'), {'network_id': network_id,
|
||||
'host': host})
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
network = plugin.get_network(context, network_id)
|
||||
|
||||
@ -62,7 +75,9 @@ class DhcpRpcCallbackMixin(object):
|
||||
# a device id that combines host and network ids
|
||||
|
||||
LOG.debug(_('Port %(device_id)s for %(network_id)s requested from '
|
||||
'%(host)s'), locals())
|
||||
'%(host)s'), {'device_id': device_id,
|
||||
'network_id': network_id,
|
||||
'host': host})
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
retval = None
|
||||
|
||||
|
@ -154,11 +154,12 @@ class ExtraRoute_db_mixin(l3_db.L3_NAT_db_mixin):
|
||||
context, router['id'])
|
||||
return routers
|
||||
|
||||
def get_sync_data(self, context, router_ids=None):
|
||||
def get_sync_data(self, context, router_ids=None, active=None):
|
||||
"""Query routers and their related floating_ips, interfaces."""
|
||||
with context.session.begin(subtransactions=True):
|
||||
routers = super(ExtraRoute_db_mixin,
|
||||
self).get_sync_data(context, router_ids)
|
||||
self).get_sync_data(context, router_ids,
|
||||
active=active)
|
||||
for router in routers:
|
||||
router['routes'] = self._get_extra_routes_by_router_id(
|
||||
context, router['id'])
|
||||
|
@ -23,11 +23,11 @@ from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.sql import expression as expr
|
||||
|
||||
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import constants as l3_constants
|
||||
from quantum.common import exceptions as q_exc
|
||||
from quantum.db import db_base_plugin_v2
|
||||
from quantum.db import l3_rpc_agent_api
|
||||
from quantum.db import model_base
|
||||
from quantum.db import models_v2
|
||||
from quantum.extensions import l3
|
||||
@ -328,7 +328,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
if len(fixed_ips) != 1:
|
||||
msg = _('Router port must have exactly one fixed IP')
|
||||
raise q_exc.BadRequest(resource='router', msg=msg)
|
||||
subnet = self._get_subnet(context, fixed_ips[0]['subnet_id'])
|
||||
subnet_id = fixed_ips[0]['subnet_id']
|
||||
subnet = self._get_subnet(context, subnet_id)
|
||||
self._check_for_dup_router_subnet(context, router_id,
|
||||
port['network_id'],
|
||||
subnet['id'],
|
||||
@ -360,7 +361,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
'name': ''}})
|
||||
|
||||
routers = self.get_sync_data(context.elevated(), [router_id])
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(
|
||||
context, routers, 'add_router_interface',
|
||||
{'network_id': port['network_id'],
|
||||
'subnet_id': subnet_id})
|
||||
info = {'port_id': port['id'],
|
||||
'subnet_id': port['fixed_ips'][0]['subnet_id']}
|
||||
notifier_api.notify(context,
|
||||
@ -409,6 +413,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
subnet_id = port_db['fixed_ips'][0]['subnet_id']
|
||||
self._confirm_router_interface_not_in_use(
|
||||
context, router_id, subnet_id)
|
||||
_network_id = port_db['network_id']
|
||||
self.delete_port(context, port_db['id'], l3_port_check=False)
|
||||
elif 'subnet_id' in interface_info:
|
||||
subnet_id = interface_info['subnet_id']
|
||||
@ -428,6 +433,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
for p in ports:
|
||||
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
|
||||
port_id = p['id']
|
||||
_network_id = p['network_id']
|
||||
self.delete_port(context, p['id'], l3_port_check=False)
|
||||
found = True
|
||||
break
|
||||
@ -438,7 +444,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
|
||||
subnet_id=subnet_id)
|
||||
routers = self.get_sync_data(context.elevated(), [router_id])
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(
|
||||
context, routers, 'remove_router_interface',
|
||||
{'network_id': _network_id,
|
||||
'subnet_id': subnet_id})
|
||||
notifier_api.notify(context,
|
||||
notifier_api.publisher_id('network'),
|
||||
'router.interface.delete',
|
||||
@ -649,7 +658,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
router_id = floatingip_db['router_id']
|
||||
if router_id:
|
||||
routers = self.get_sync_data(context.elevated(), [router_id])
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
|
||||
'create_floatingip')
|
||||
return self._make_floatingip_dict(floatingip_db)
|
||||
|
||||
def update_floatingip(self, context, id, floatingip):
|
||||
@ -671,7 +681,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
router_ids.append(router_id)
|
||||
if router_ids:
|
||||
routers = self.get_sync_data(context.elevated(), router_ids)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
|
||||
'update_floatingip')
|
||||
return self._make_floatingip_dict(floatingip_db)
|
||||
|
||||
def delete_floatingip(self, context, id):
|
||||
@ -684,7 +695,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
l3_port_check=False)
|
||||
if router_id:
|
||||
routers = self.get_sync_data(context.elevated(), [router_id])
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
|
||||
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
|
||||
'delete_floatingip')
|
||||
|
||||
def get_floatingip(self, context, id, fields=None):
|
||||
floatingip = self._get_floatingip(context, id)
|
||||
@ -816,7 +828,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
else:
|
||||
return [n for n in nets if n['id'] not in ext_nets]
|
||||
|
||||
def _get_sync_routers(self, context, router_ids=None):
|
||||
def _get_sync_routers(self, context, router_ids=None, active=None):
|
||||
"""Query routers and their gw ports for l3 agent.
|
||||
|
||||
Query routers with the router_ids. The gateway ports, if any,
|
||||
@ -831,7 +843,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
"""
|
||||
router_query = context.session.query(Router)
|
||||
if router_ids:
|
||||
router_query = router_query.filter(Router.id.in_(router_ids))
|
||||
if 1 == len(router_ids):
|
||||
router_query = router_query.filter(Router.id == router_ids[0])
|
||||
else:
|
||||
router_query = router_query.filter(Router.id.in_(router_ids))
|
||||
if active is not None:
|
||||
router_query = router_query.filter(Router.admin_state_up == active)
|
||||
routers = router_query.all()
|
||||
gw_port_ids = []
|
||||
if not routers:
|
||||
@ -842,7 +859,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
gw_port_ids.append(gw_port_id)
|
||||
gw_ports = []
|
||||
if gw_port_ids:
|
||||
gw_ports = self._get_sync_gw_ports(context, gw_port_ids)
|
||||
gw_ports = self.get_sync_gw_ports(context, gw_port_ids)
|
||||
gw_port_id_gw_port_dict = {}
|
||||
for gw_port in gw_ports:
|
||||
gw_port_id_gw_port_dict[gw_port['id']] = gw_port
|
||||
@ -862,7 +879,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
return []
|
||||
return self.get_floatingips(context, {'router_id': router_ids})
|
||||
|
||||
def _get_sync_gw_ports(self, context, gw_port_ids):
|
||||
def get_sync_gw_ports(self, context, gw_port_ids):
|
||||
if not gw_port_ids:
|
||||
return []
|
||||
filters = {'id': gw_port_ids}
|
||||
@ -871,12 +888,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
self._populate_subnet_for_ports(context, gw_ports)
|
||||
return gw_ports
|
||||
|
||||
def _get_sync_interfaces(self, context, router_ids):
|
||||
def get_sync_interfaces(self, context, router_ids,
|
||||
device_owner=DEVICE_OWNER_ROUTER_INTF):
|
||||
"""Query router interfaces that relate to list of router_ids."""
|
||||
if not router_ids:
|
||||
return []
|
||||
filters = {'device_id': router_ids,
|
||||
'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
|
||||
'device_owner': [device_owner]}
|
||||
interfaces = self.get_ports(context, filters)
|
||||
if interfaces:
|
||||
self._populate_subnet_for_ports(context, interfaces)
|
||||
@ -934,14 +952,15 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||
router[l3_constants.INTERFACE_KEY] = router_interfaces
|
||||
return routers_dict.values()
|
||||
|
||||
def get_sync_data(self, context, router_ids=None):
|
||||
def get_sync_data(self, context, router_ids=None, active=None):
|
||||
"""Query routers and their related floating_ips, interfaces."""
|
||||
with context.session.begin(subtransactions=True):
|
||||
routers = self._get_sync_routers(context,
|
||||
router_ids)
|
||||
router_ids=router_ids,
|
||||
active=active)
|
||||
router_ids = [router['id'] for router in routers]
|
||||
floating_ips = self._get_sync_floating_ips(context, router_ids)
|
||||
interfaces = self._get_sync_interfaces(context, router_ids)
|
||||
interfaces = self.get_sync_interfaces(context, router_ids)
|
||||
return self._process_sync_data(routers, interfaces, floating_ips)
|
||||
|
||||
def get_external_network_id(self, context):
|
||||
|
@ -1,50 +0,0 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import topics
|
||||
from quantum.openstack.common import jsonutils
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L3AgentNotifyAPI(proxy.RpcProxy):
|
||||
"""API for plugin to notify L3 agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=topics.L3_AGENT):
|
||||
super(L3AgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def router_deleted(self, context, router_id):
|
||||
LOG.debug(_('Notify agent the router %s is deleted'), router_id)
|
||||
self.cast(context,
|
||||
self.make_msg('router_deleted',
|
||||
router_id=router_id),
|
||||
topic=self.topic)
|
||||
|
||||
def routers_updated(self, context, routers):
|
||||
if routers:
|
||||
LOG.debug(_('Notify agent routers were updated:\n %s'),
|
||||
jsonutils.dumps(routers, indent=5))
|
||||
self.cast(context,
|
||||
self.make_msg('routers_updated',
|
||||
routers=routers),
|
||||
topic=self.topic)
|
||||
|
||||
|
||||
L3AgentNotify = L3AgentNotifyAPI()
|
@ -13,6 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.common import utils
|
||||
from quantum import context as quantum_context
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import jsonutils
|
||||
@ -34,10 +38,17 @@ class L3RpcCallbackMixin(object):
|
||||
with their interfaces and floating_ips
|
||||
"""
|
||||
router_id = kwargs.get('router_id')
|
||||
# TODO(gongysh) we will use host in kwargs for multi host BP
|
||||
host = kwargs.get('host')
|
||||
context = quantum_context.get_admin_context()
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
routers = plugin.get_sync_data(context, router_id)
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
|
||||
if cfg.CONF.router_auto_schedule:
|
||||
plugin.auto_schedule_routers(context, host, router_id)
|
||||
routers = plugin.list_active_sync_routers_on_active_l3_agent(
|
||||
context, host, router_id)
|
||||
else:
|
||||
routers = plugin.get_sync_data(context, router_id)
|
||||
LOG.debug(_("Routers returned to l3 agent:\n %s"),
|
||||
jsonutils.dumps(routers, indent=5))
|
||||
return routers
|
||||
|
@ -0,0 +1,79 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""agent scheduler
|
||||
|
||||
Revision ID: 4692d074d587
|
||||
Revises: 3b54bf9e29f7
|
||||
Create Date: 2013-02-21 23:01:50.370306
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '4692d074d587'
|
||||
down_revision = '3b54bf9e29f7'
|
||||
|
||||
# Change to ['*'] if this migration applies to all plugins
|
||||
|
||||
migration_for_plugins = [
|
||||
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2'
|
||||
]
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
from quantum.db import migration
|
||||
|
||||
|
||||
def upgrade(active_plugin=None, options=None):
|
||||
if not migration.should_run(active_plugin, migration_for_plugins):
|
||||
return
|
||||
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
'networkdhcpagentbindings',
|
||||
sa.Column('network_id', sa.String(length=36), nullable=False),
|
||||
sa.Column('dhcp_agent_id', sa.String(length=36), nullable=False),
|
||||
sa.ForeignKeyConstraint(['dhcp_agent_id'], ['agents.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.ForeignKeyConstraint(['network_id'], ['networks.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.PrimaryKeyConstraint('network_id', 'dhcp_agent_id')
|
||||
)
|
||||
op.create_table(
|
||||
'routerl3agentbindings',
|
||||
sa.Column('id', sa.String(length=36), nullable=False),
|
||||
sa.Column('router_id', sa.String(length=36), nullable=True),
|
||||
sa.Column('l3_agent_id', sa.String(length=36), nullable=True),
|
||||
sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(active_plugin=None, options=None):
|
||||
if not migration.should_run(active_plugin, migration_for_plugins):
|
||||
return
|
||||
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('routerl3agentbindings')
|
||||
op.drop_table('networkdhcpagentbindings')
|
||||
### end Alembic commands ###
|
252
quantum/extensions/agentscheduler.py
Normal file
252
quantum/extensions/agentscheduler.py
Normal file
@ -0,0 +1,252 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
from quantum.api import extensions
|
||||
from quantum.api.v2 import base
|
||||
from quantum.api.v2 import resource
|
||||
from quantum.common import constants
|
||||
from quantum.common import exceptions
|
||||
from quantum.extensions import agent
|
||||
from quantum import manager
|
||||
from quantum import policy
|
||||
from quantum import wsgi
|
||||
|
||||
DHCP_NET = 'dhcp-network'
|
||||
DHCP_NETS = DHCP_NET + 's'
|
||||
DHCP_AGENT = 'dhcp-agent'
|
||||
DHCP_AGENTS = DHCP_AGENT + 's'
|
||||
L3_ROUTER = 'l3-router'
|
||||
L3_ROUTERS = L3_ROUTER + 's'
|
||||
L3_AGENT = 'l3-agent'
|
||||
L3_AGENTS = L3_AGENT + 's'
|
||||
|
||||
|
||||
class NetworkSchedulerController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % DHCP_NETS,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.list_networks_on_dhcp_agent(
|
||||
request.context, kwargs['agent_id'])
|
||||
|
||||
def create(self, request, body, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"create_%s" % DHCP_NET,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.add_network_to_dhcp_agent(
|
||||
request.context, kwargs['agent_id'], body['network_id'])
|
||||
|
||||
def delete(self, request, id, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"delete_%s" % DHCP_NET,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.remove_network_from_dhcp_agent(
|
||||
request.context, kwargs['agent_id'], id)
|
||||
|
||||
|
||||
class RouterSchedulerController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % L3_ROUTERS,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.list_routers_on_l3_agent(
|
||||
request.context, kwargs['agent_id'])
|
||||
|
||||
def create(self, request, body, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"create_%s" % L3_ROUTER,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.add_router_to_l3_agent(
|
||||
request.context,
|
||||
kwargs['agent_id'],
|
||||
body['router_id'])
|
||||
|
||||
def delete(self, request, id, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"delete_%s" % L3_ROUTER,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.remove_router_from_l3_agent(
|
||||
request.context, kwargs['agent_id'], id)
|
||||
|
||||
|
||||
class DhcpAgentsHostingNetworkController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % DHCP_AGENTS,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.list_dhcp_agents_hosting_network(
|
||||
request.context, kwargs['network_id'])
|
||||
|
||||
|
||||
class L3AgentsHostingRouterController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % L3_AGENTS,
|
||||
{},
|
||||
plugin=plugin)
|
||||
return plugin.list_l3_agents_hosting_router(
|
||||
request.context, kwargs['router_id'])
|
||||
|
||||
|
||||
class Agentscheduler(extensions.ExtensionDescriptor):
|
||||
"""Extension class supporting agent scheduler.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return "Agent Schedulers"
|
||||
|
||||
@classmethod
|
||||
def get_alias(cls):
|
||||
return constants.AGENT_SCHEDULER_EXT_ALIAS
|
||||
|
||||
@classmethod
|
||||
def get_description(cls):
|
||||
return "Schedule resources among agents"
|
||||
|
||||
@classmethod
|
||||
def get_namespace(cls):
|
||||
return "http://docs.openstack.org/ext/agent_scheduler/api/v1.0"
|
||||
|
||||
@classmethod
|
||||
def get_updated(cls):
|
||||
return "2013-02-03T10:00:00-00:00"
|
||||
|
||||
@classmethod
|
||||
def get_resources(cls):
|
||||
"""Returns Ext Resources """
|
||||
exts = []
|
||||
parent = dict(member_name="agent",
|
||||
collection_name="agents")
|
||||
controller = resource.Resource(NetworkSchedulerController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
DHCP_NETS, controller, parent))
|
||||
|
||||
controller = resource.Resource(RouterSchedulerController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
L3_ROUTERS, controller, parent))
|
||||
|
||||
parent = dict(member_name="network",
|
||||
collection_name="networks")
|
||||
|
||||
controller = resource.Resource(DhcpAgentsHostingNetworkController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
DHCP_AGENTS, controller, parent))
|
||||
|
||||
parent = dict(member_name="router",
|
||||
collection_name="routers")
|
||||
|
||||
controller = resource.Resource(L3AgentsHostingRouterController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
L3_AGENTS, controller, parent))
|
||||
return exts
|
||||
|
||||
def get_extended_resources(self, version):
|
||||
return {}
|
||||
|
||||
|
||||
class InvalidDHCPAgent(agent.AgentNotFound):
|
||||
message = _("Agent %(id)s is not a valid DHCP Agent or has been disabled")
|
||||
|
||||
|
||||
class NetworkHostedByDHCPAgent(exceptions.Conflict):
|
||||
message = _("The network %(network_id)s has been already hosted"
|
||||
" by the DHCP Agent %(agent_id)s.")
|
||||
|
||||
|
||||
class NetworkNotHostedByDhcpAgent(exceptions.Conflict):
|
||||
message = _("The network %(network_id)s is not hosted"
|
||||
" by the DHCP agent %(agent_id)s.")
|
||||
|
||||
|
||||
class InvalidL3Agent(agent.AgentNotFound):
|
||||
message = _("Agent %(id)s is not a L3 Agent or has been disabled")
|
||||
|
||||
|
||||
class RouterHostedByL3Agent(exceptions.Conflict):
|
||||
message = _("The router %(router_id)s has been already hosted"
|
||||
" by the L3 Agent %(agent_id)s.")
|
||||
|
||||
|
||||
class RouterSchedulingFailed(exceptions.Conflict):
|
||||
message = _("Failed scheduling router %(router_id)s to"
|
||||
" the L3 Agent %(agent_id)s.")
|
||||
|
||||
|
||||
class RouterNotHostedByL3Agent(exceptions.Conflict):
|
||||
message = _("The router %(router_id)s is not hosted"
|
||||
" by L3 agent %(agent_id)s.")
|
||||
|
||||
|
||||
class AgentSchedulerPluginBase(object):
|
||||
""" REST API to operate the agent scheduler.
|
||||
|
||||
All of method must be in an admin context.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def add_network_to_dhcp_agent(self, context, id, network_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def remove_network_from_dhcp_agent(self, context, id, network_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_networks_on_dhcp_agent(self, context, id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_dhcp_agents_hosting_network(self, context, network_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_router_to_l3_agent(self, context, id, router_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def remove_router_from_l3_agent(self, context, id, router_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_routers_on_l3_agent(self, context, id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_l3_agents_hosting_router(self, context, router_id):
|
||||
pass
|
@ -17,6 +17,7 @@
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.agent.common import config
|
||||
from quantum import scheduler
|
||||
|
||||
|
||||
DEFAULT_BRIDGE_MAPPINGS = []
|
||||
@ -64,3 +65,4 @@ cfg.CONF.register_opts(ovs_opts, "OVS")
|
||||
cfg.CONF.register_opts(agent_opts, "AGENT")
|
||||
config.register_agent_state_opts_helper(cfg.CONF)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS)
|
||||
|
@ -25,12 +25,15 @@ import sys
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.agent import securitygroups_rpc as sg_rpc
|
||||
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import constants as q_const
|
||||
from quantum.common import exceptions as q_exc
|
||||
from quantum.common import rpc as q_rpc
|
||||
from quantum.common import topics
|
||||
from quantum.db import agents_db
|
||||
from quantum.db import agentschedulers_db
|
||||
from quantum.db import db_base_plugin_v2
|
||||
from quantum.db import dhcp_rpc_base
|
||||
from quantum.db import extraroute_db
|
||||
@ -41,6 +44,7 @@ from quantum.db import securitygroups_rpc_base as sg_db_rpc
|
||||
from quantum.extensions import portbindings
|
||||
from quantum.extensions import providernet as provider
|
||||
from quantum.extensions import securitygroup as ext_sg
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import rpc
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
@ -211,7 +215,9 @@ class AgentNotifierApi(proxy.RpcProxy,
|
||||
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcMixin,
|
||||
agents_db.AgentDbMixin):
|
||||
agents_db.AgentDbMixin,
|
||||
agentschedulers_db.AgentSchedulerDbMixin):
|
||||
|
||||
"""Implement the Quantum abstractions using Open vSwitch.
|
||||
|
||||
Depending on whether tunneling is enabled, either a GRE tunnel or
|
||||
@ -238,8 +244,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
|
||||
supported_extension_aliases = ["provider", "router",
|
||||
"binding", "quotas", "security-group",
|
||||
"agent",
|
||||
"extraroute"]
|
||||
"agent", "extraroute", "agent_scheduler"]
|
||||
|
||||
network_view = "extension:provider_network:view"
|
||||
network_set = "extension:provider_network:set"
|
||||
@ -269,12 +274,18 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
"Agent terminated!"))
|
||||
sys.exit(1)
|
||||
self.setup_rpc()
|
||||
self.network_scheduler = importutils.import_object(
|
||||
cfg.CONF.network_scheduler_driver)
|
||||
self.router_scheduler = importutils.import_object(
|
||||
cfg.CONF.router_scheduler_driver)
|
||||
|
||||
def setup_rpc(self):
|
||||
# RPC support
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.callbacks = OVSRpcCallbacks(self.notifier)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
@ -486,6 +497,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
self._extend_network_dict_l3(context, net)
|
||||
# note - exception will rollback entire transaction
|
||||
LOG.debug(_("Created network: %s"), net['id'])
|
||||
self.schedule_network(context, network['network'], net)
|
||||
return net
|
||||
|
||||
def update_network(self, context, id, network):
|
||||
@ -569,6 +581,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
else:
|
||||
self.notifier.security_groups_member_updated(
|
||||
context, port.get(ext_sg.SECURITYGROUPS))
|
||||
net = self.get_network(context, port['network_id'])
|
||||
self.schedule_network(context, None, net)
|
||||
return self._extend_port_dict_binding(context, port)
|
||||
|
||||
def get_port(self, context, id, fields=None):
|
||||
@ -636,3 +650,20 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
|
||||
self.notifier.security_groups_member_updated(
|
||||
context, port.get(ext_sg.SECURITYGROUPS))
|
||||
|
||||
def update_agent(self, context, id, agent):
|
||||
original_agent = self.get_agent(context, id)
|
||||
result = super(OVSQuantumPluginV2, self).update_agent(
|
||||
context, id, agent)
|
||||
agent_data = agent['agent']
|
||||
if ('admin_state_up' in agent_data and
|
||||
original_agent['admin_state_up'] != agent_data['admin_state_up']):
|
||||
if original_agent['agent_type'] == q_const.AGENT_TYPE_DHCP:
|
||||
self.dhcp_agent_notifier.agent_updated(
|
||||
context, agent_data['admin_state_up'],
|
||||
original_agent['host'])
|
||||
elif original_agent['agent_type'] == q_const.AGENT_TYPE_L3:
|
||||
self.l3_agent_notifier.agent_updated(
|
||||
context, agent_data['admin_state_up'],
|
||||
original_agent['host'])
|
||||
return result
|
||||
|
@ -51,6 +51,7 @@ def init():
|
||||
raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
|
||||
# pass _set_brain to read_cached_file so that the policy brain
|
||||
# is reset only if the file has changed
|
||||
LOG.debug(_("loading policy file at %s"), _POLICY_PATH)
|
||||
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
|
||||
reload_func=_set_rules)
|
||||
|
||||
|
34
quantum/scheduler/__init__.py
Normal file
34
quantum/scheduler/__init__.py
Normal file
@ -0,0 +1,34 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
|
||||
AGENTS_SCHEDULER_OPTS = [
|
||||
cfg.StrOpt('network_scheduler_driver',
|
||||
default='quantum.scheduler.'
|
||||
'dhcp_agent_scheduler.ChanceScheduler',
|
||||
help=_('Driver to use for scheduling network to DHCP agent')),
|
||||
cfg.StrOpt('router_scheduler_driver',
|
||||
default='quantum.scheduler.l3_agent_scheduler.ChanceScheduler',
|
||||
help=_('Driver to use for scheduling '
|
||||
'router to a default L3 agent')),
|
||||
cfg.BoolOpt('network_auto_schedule', default=True,
|
||||
help=_('Allow auto scheduling networks to DHCP agent.')),
|
||||
cfg.BoolOpt('router_auto_schedule', default=True,
|
||||
help=_('Allow auto scheduling routers to L3 agent.')),
|
||||
]
|
108
quantum/scheduler/dhcp_agent_scheduler.py
Normal file
108
quantum/scheduler/dhcp_agent_scheduler.py
Normal file
@ -0,0 +1,108 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.sql import exists
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.db import models_v2
|
||||
from quantum.db import agents_db
|
||||
from quantum.db import agentschedulers_db
|
||||
from quantum.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChanceScheduler(object):
|
||||
"""Allocate a DHCP agent for a network in a random way.
|
||||
More sophisticated scheduler (similar to filter scheduler in nova?)
|
||||
can be introduced later."""
|
||||
|
||||
def schedule(self, plugin, context, request_network, network):
|
||||
"""Schedule the network to an active DHCP agent if there
|
||||
is no active DHCP agent hosting it.
|
||||
"""
|
||||
#TODO(gongysh) don't schedule the networks with only
|
||||
# subnets whose enable_dhcp is false
|
||||
with context.session.begin(subtransactions=True):
|
||||
dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network['id']], active=True)
|
||||
if dhcp_agents:
|
||||
LOG.debug(_('Network %s is hosted already'),
|
||||
network['id'])
|
||||
return False
|
||||
enabled_dhcp_agents = plugin.get_agents_db(
|
||||
context, filters={
|
||||
'agent_type': [constants.AGENT_TYPE_DHCP],
|
||||
'admin_state_up': [True]})
|
||||
if not enabled_dhcp_agents:
|
||||
LOG.warn(_('No enabled DHCP agents'))
|
||||
return False
|
||||
active_dhcp_agents = [enabled_dhcp_agent for enabled_dhcp_agent in
|
||||
enabled_dhcp_agents if not
|
||||
agents_db.AgentDbMixin.is_agent_down(
|
||||
enabled_dhcp_agent['heartbeat_timestamp'])]
|
||||
if not active_dhcp_agents:
|
||||
LOG.warn(_('No active DHCP agents'))
|
||||
return False
|
||||
chosen_agent = random.choice(active_dhcp_agents)
|
||||
binding = agentschedulers_db.NetworkDhcpAgentBinding()
|
||||
binding.dhcp_agent = chosen_agent
|
||||
binding.network_id = network['id']
|
||||
context.session.add(binding)
|
||||
LOG.debug(_('Network %(network_id)s is scheduled to be hosted by '
|
||||
'DHCP agent %(agent_id)s'),
|
||||
{'network_id': network['id'],
|
||||
'agent_id': chosen_agent['id']})
|
||||
return True
|
||||
|
||||
def auto_schedule_networks(self, plugin, context, host):
|
||||
"""Schedule non-hosted networks to the DHCP agent on
|
||||
the specified host."""
|
||||
with context.session.begin(subtransactions=True):
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.agent_type ==
|
||||
constants.AGENT_TYPE_DHCP,
|
||||
agents_db.Agent.host == host,
|
||||
agents_db.Agent.admin_state_up == True)
|
||||
try:
|
||||
dhcp_agent = query.one()
|
||||
except (exc.MultipleResultsFound, exc.NoResultFound):
|
||||
LOG.warn(_('No enabled DHCP agent on host %s'),
|
||||
host)
|
||||
return False
|
||||
if agents_db.AgentDbMixin.is_agent_down(
|
||||
dhcp_agent.heartbeat_timestamp):
|
||||
LOG.warn(_('DHCP agent %s is not active'), dhcp_agent.id)
|
||||
#TODO(gongysh) consider the disabled agent's network
|
||||
net_stmt = ~exists().where(
|
||||
models_v2.Network.id ==
|
||||
agentschedulers_db.NetworkDhcpAgentBinding.network_id)
|
||||
net_ids = context.session.query(
|
||||
models_v2.Network.id).filter(net_stmt).all()
|
||||
if not net_ids:
|
||||
LOG.debug(_('No non-hosted networks'))
|
||||
return False
|
||||
for net_id in net_ids:
|
||||
binding = agentschedulers_db.NetworkDhcpAgentBinding()
|
||||
binding.dhcp_agent = dhcp_agent
|
||||
binding.network_id = net_id[0]
|
||||
context.session.add(binding)
|
||||
return True
|
149
quantum/scheduler/l3_agent_scheduler.py
Normal file
149
quantum/scheduler/l3_agent_scheduler.py
Normal file
@ -0,0 +1,149 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.sql import exists
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.db import l3_db
|
||||
from quantum.db import agents_db
|
||||
from quantum.db import agentschedulers_db
|
||||
from quantum.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChanceScheduler(object):
|
||||
"""Allocate a L3 agent for a router in a random way.
|
||||
More sophisticated scheduler (similar to filter scheduler in nova?)
|
||||
can be introduced later."""
|
||||
|
||||
def auto_schedule_routers(self, plugin, context, host, router_id):
|
||||
"""Schedule non-hosted routers to L3 Agent running on host.
|
||||
If router_id is given, only this router is scheduled
|
||||
if it is not hosted yet.
|
||||
Don't schedule the routers which are hosted already
|
||||
by active l3 agents.
|
||||
"""
|
||||
with context.session.begin(subtransactions=True):
|
||||
# query if we have valid l3 agent on the host
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter(agents_db.Agent.agent_type ==
|
||||
constants.AGENT_TYPE_L3,
|
||||
agents_db.Agent.host == host,
|
||||
agents_db.Agent.admin_state_up == True)
|
||||
try:
|
||||
l3_agent = query.one()
|
||||
except (exc.MultipleResultsFound, exc.NoResultFound):
|
||||
LOG.debug(_('No enabled L3 agent on host %s'),
|
||||
host)
|
||||
return False
|
||||
if agents_db.AgentDbMixin.is_agent_down(
|
||||
l3_agent.heartbeat_timestamp):
|
||||
LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
|
||||
# check if the specified router is hosted
|
||||
if router_id:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [router_id], admin_state_up=True)
|
||||
if l3_agents:
|
||||
LOG.debug(_('Router %(router_id)s has already been hosted'
|
||||
' by L3 agent %(agent_id)s'),
|
||||
{'router_id': router_id,
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
return False
|
||||
|
||||
# get the router ids
|
||||
if router_id:
|
||||
router_ids = [(router_id,)]
|
||||
else:
|
||||
# get all routers that are not hosted
|
||||
#TODO(gongysh) consider the disabled agent's router
|
||||
stmt = ~exists().where(
|
||||
l3_db.Router.id ==
|
||||
agentschedulers_db.RouterL3AgentBinding.router_id)
|
||||
router_ids = context.session.query(
|
||||
l3_db.Router.id).filter(stmt).all()
|
||||
if not router_ids:
|
||||
LOG.debug(_('No non-hosted routers'))
|
||||
return False
|
||||
|
||||
# check if the configuration of l3 agent is compatible
|
||||
# with the router
|
||||
router_ids = [router_id[0] for router_id in router_ids]
|
||||
routers = plugin.get_routers(context, filters={'id': router_ids})
|
||||
to_removed_ids = []
|
||||
for router in routers:
|
||||
candidates = plugin.get_l3_agent_candidates(router, [l3_agent])
|
||||
if not candidates:
|
||||
to_removed_ids.append(router['id'])
|
||||
router_ids = list(set(router_ids) - set(to_removed_ids))
|
||||
if not router_ids:
|
||||
LOG.warn(_('No routers compatible with L3 agent configuration'
|
||||
' on host %s', host))
|
||||
return False
|
||||
|
||||
# binding
|
||||
for router_id in router_ids:
|
||||
binding = agentschedulers_db.RouterL3AgentBinding()
|
||||
binding.l3_agent = l3_agent
|
||||
binding.router_id = router_id
|
||||
binding.default = True
|
||||
context.session.add(binding)
|
||||
return True
|
||||
|
||||
def schedule(self, plugin, context, sync_router):
|
||||
"""Schedule the router to an active L3 agent if there
|
||||
is no enable L3 agent hosting it.
|
||||
"""
|
||||
with context.session.begin(subtransactions=True):
|
||||
# allow one router is hosted by just
|
||||
# one enabled l3 agent hosting since active is just a
|
||||
# timing problem. Non-active l3 agent can return to
|
||||
# active any time
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
context, [sync_router['id']], admin_state_up=True)
|
||||
if l3_agents:
|
||||
LOG.debug(_('Router %(router_id)s has already been hosted'
|
||||
' by L3 agent %(agent_id)s'),
|
||||
{'router_id': sync_router['id'],
|
||||
'agent_id': l3_agents[0]['id']})
|
||||
return False
|
||||
|
||||
active_l3_agents = plugin.get_l3_agents(context, active=True)
|
||||
if not active_l3_agents:
|
||||
LOG.warn(_('No active L3 agents'))
|
||||
return False
|
||||
candidates = plugin.get_l3_agent_candidates(sync_router,
|
||||
active_l3_agents)
|
||||
if not candidates:
|
||||
LOG.warn(_('No L3 agents can host the router %s'),
|
||||
sync_router['id'])
|
||||
return False
|
||||
|
||||
chosen_agent = random.choice(candidates)
|
||||
binding = agentschedulers_db.RouterL3AgentBinding()
|
||||
binding.l3_agent = chosen_agent
|
||||
binding.router_id = sync_router['id']
|
||||
context.session.add(binding)
|
||||
LOG.debug(_('Router %(router_id)s is scheduled to '
|
||||
'L3 agent %(agent_id)s'),
|
||||
{'router_id': sync_router['id'],
|
||||
'agent_id': chosen_agent['id']})
|
||||
return True
|
803
quantum/tests/unit/openvswitch/test_agent_scheduler.py
Normal file
803
quantum/tests/unit/openvswitch/test_agent_scheduler.py
Normal file
@ -0,0 +1,803 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
|
||||
import mock
|
||||
from webob import exc
|
||||
|
||||
from quantum.api import extensions
|
||||
from quantum.common import constants
|
||||
from quantum import context
|
||||
from quantum.db import agents_db
|
||||
from quantum.db import dhcp_rpc_base
|
||||
from quantum.db import l3_rpc_base
|
||||
from quantum.extensions import agentscheduler
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import uuidutils
|
||||
from quantum.plugins.openvswitch.ovs_quantum_plugin import OVSQuantumPluginV2
|
||||
from quantum.tests.unit import test_agent_ext_plugin
|
||||
from quantum.tests.unit.testlib_api import create_request
|
||||
from quantum.tests.unit import test_db_plugin as test_plugin
|
||||
from quantum.tests.unit import test_extensions
|
||||
from quantum.tests.unit import test_l3_plugin
|
||||
from quantum.wsgi import Serializer
|
||||
|
||||
L3_HOSTA = 'hosta'
|
||||
DHCP_HOSTA = 'hosta'
|
||||
L3_HOSTB = 'hostb'
|
||||
DHCP_HOSTC = 'hostc'
|
||||
|
||||
|
||||
class AgentSchedulerTestMixIn(object):
|
||||
|
||||
def _request_list(self, path, admin_context=True,
|
||||
expected_code=exc.HTTPOk.code):
|
||||
req = self._path_req(path, admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
return self.deserialize(self.fmt, res)
|
||||
|
||||
def _path_req(self, path, method='GET', data=None,
|
||||
query_string=None,
|
||||
admin_context=True):
|
||||
content_type = 'application/%s' % self.fmt
|
||||
body = None
|
||||
if data is not None: # empty dict is valid
|
||||
body = Serializer().serialize(data, content_type)
|
||||
if admin_context:
|
||||
return create_request(
|
||||
path, body, content_type, method, query_string=query_string)
|
||||
else:
|
||||
return create_request(
|
||||
path, body, content_type, method, query_string=query_string,
|
||||
context=context.Context('', 'tenant_id'))
|
||||
|
||||
def _path_create_request(self, path, data, admin_context=True):
|
||||
return self._path_req(path, method='POST', data=data,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _path_show_request(self, path, admin_context=True):
|
||||
return self._path_req(path, admin_context=admin_context)
|
||||
|
||||
def _path_delete_request(self, path, admin_context=True):
|
||||
return self._path_req(path, method='DELETE',
|
||||
admin_context=admin_context)
|
||||
|
||||
def _path_update_request(self, path, data, admin_context=True):
|
||||
return self._path_req(path, method='PUT', data=data,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _list_routers_hosted_by_l3_agent(self, agent_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s.%s" % (agent_id,
|
||||
agentscheduler.L3_ROUTERS,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _list_networks_hosted_by_dhcp_agent(self, agent_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s.%s" % (agent_id,
|
||||
agentscheduler.DHCP_NETS,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _list_l3_agents_hosting_router(self, router_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/routers/%s/%s.%s" % (router_id,
|
||||
agentscheduler.L3_AGENTS,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _list_dhcp_agents_hosting_network(self, network_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/networks/%s/%s.%s" % (network_id,
|
||||
agentscheduler.DHCP_AGENTS,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _add_router_to_l3_agent(self, id, router_id,
|
||||
expected_code=exc.HTTPCreated.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s.%s" % (id,
|
||||
agentscheduler.L3_ROUTERS,
|
||||
self.fmt)
|
||||
req = self._path_create_request(path,
|
||||
{'router_id': router_id},
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
|
||||
def _add_network_to_dhcp_agent(self, id, network_id,
|
||||
expected_code=exc.HTTPCreated.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s.%s" % (id,
|
||||
agentscheduler.DHCP_NETS,
|
||||
self.fmt)
|
||||
req = self._path_create_request(path,
|
||||
{'network_id': network_id},
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
|
||||
def _remove_network_from_dhcp_agent(self, id, network_id,
|
||||
expected_code=exc.HTTPNoContent.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s/%s.%s" % (id,
|
||||
agentscheduler.DHCP_NETS,
|
||||
network_id,
|
||||
self.fmt)
|
||||
req = self._path_delete_request(path,
|
||||
admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
|
||||
def _remove_router_from_l3_agent(self, id, router_id,
|
||||
expected_code=exc.HTTPNoContent.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s/%s.%s" % (id,
|
||||
agentscheduler.L3_ROUTERS,
|
||||
router_id,
|
||||
self.fmt)
|
||||
req = self._path_delete_request(path, admin_context=admin_context)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, expected_code)
|
||||
|
||||
def _register_one_agent_state(self, agent_state):
|
||||
callback = agents_db.AgentExtRpcCallback()
|
||||
callback.report_state(self.adminContext,
|
||||
agent_state={'agent_state': agent_state})
|
||||
|
||||
def _disable_agent(self, agent_id, admin_state_up=False):
|
||||
new_agent = {}
|
||||
new_agent['agent'] = {}
|
||||
new_agent['agent']['admin_state_up'] = admin_state_up
|
||||
self._update('agents', agent_id, new_agent)
|
||||
|
||||
def _get_agent_id(self, agent_type, host):
|
||||
agents = self._list_agents()
|
||||
for agent in agents['agents']:
|
||||
if (agent['agent_type'] == agent_type and
|
||||
agent['host'] == host):
|
||||
return agent['id']
|
||||
|
||||
|
||||
class AgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
|
||||
test_agent_ext_plugin.AgentDBTestMixIn,
|
||||
AgentSchedulerTestMixIn,
|
||||
test_plugin.QuantumDbPluginV2TestCase):
|
||||
fmt = 'json'
|
||||
|
||||
def setUp(self):
|
||||
plugin = ('quantum.plugins.openvswitch.'
|
||||
'ovs_quantum_plugin.OVSQuantumPluginV2')
|
||||
self.dhcp_notifier_cls_p = mock.patch(
|
||||
'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
|
||||
'DhcpAgentNotifyAPI')
|
||||
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
|
||||
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
|
||||
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
|
||||
super(AgentSchedulerTestCase, self).setUp(plugin)
|
||||
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
|
||||
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
||||
self.adminContext = context.get_admin_context()
|
||||
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
|
||||
self.addCleanup(self.dhcp_notifier_cls_p.stop)
|
||||
|
||||
def test_report_states(self):
|
||||
self._register_agent_states()
|
||||
agents = self._list_agents()
|
||||
self.assertEqual(4, len(agents['agents']))
|
||||
|
||||
def test_network_scheduling_on_network_creation(self):
|
||||
self._register_agent_states()
|
||||
with self.network() as net:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net['network']['id'])
|
||||
self.assertEqual(1, len(dhcp_agents['agents']))
|
||||
|
||||
def test_network_auto_schedule_with_disabled(self):
|
||||
with contextlib.nested(self.network(),
|
||||
self.network()):
|
||||
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTC)
|
||||
self._disable_agent(hosta_id)
|
||||
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
|
||||
# second agent will host all the networks since first is disabled.
|
||||
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
|
||||
networks = self._list_networks_hosted_by_dhcp_agent(hostc_id)
|
||||
num_hostc_nets = len(networks['networks'])
|
||||
networks = self._list_networks_hosted_by_dhcp_agent(hosta_id)
|
||||
num_hosta_nets = len(networks['networks'])
|
||||
self.assertEqual(0, num_hosta_nets)
|
||||
self.assertEqual(2, num_hostc_nets)
|
||||
|
||||
def test_network_auto_schedule_with_hosted(self):
|
||||
# one agent hosts all the networks, other hosts none
|
||||
with contextlib.nested(self.network(),
|
||||
self.network()) as (net1, net2):
|
||||
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
|
||||
self._register_agent_states()
|
||||
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
|
||||
# second agent will not host the network since first has got it.
|
||||
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net1['network']['id'])
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTC)
|
||||
hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
|
||||
num_hosta_nets = len(hosta_nets['networks'])
|
||||
hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
|
||||
num_hostc_nets = len(hostc_nets['networks'])
|
||||
|
||||
self.assertEqual(2, num_hosta_nets)
|
||||
self.assertEqual(0, num_hostc_nets)
|
||||
self.assertEqual(1, len(dhcp_agents['agents']))
|
||||
self.assertEqual(DHCP_HOSTA, dhcp_agents['agents'][0]['host'])
|
||||
|
||||
def test_network_auto_schedule_with_hosted_2(self):
|
||||
# one agent hosts one network
|
||||
dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
|
||||
dhcp_hosta = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': DHCP_HOSTA,
|
||||
'topic': 'DHCP_AGENT',
|
||||
'configurations': {'dhcp_driver': 'dhcp_driver',
|
||||
'use_namespaces': True,
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
dhcp_hostc = copy.deepcopy(dhcp_hosta)
|
||||
dhcp_hostc['host'] = DHCP_HOSTC
|
||||
with self.network() as net1:
|
||||
self._register_one_agent_state(dhcp_hosta)
|
||||
dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
self._disable_agent(hosta_id, admin_state_up=False)
|
||||
with self.network() as net2:
|
||||
self._register_one_agent_state(dhcp_hostc)
|
||||
dhcp_rpc.get_active_networks(self.adminContext,
|
||||
host=DHCP_HOSTC)
|
||||
dhcp_agents_1 = self._list_dhcp_agents_hosting_network(
|
||||
net1['network']['id'])
|
||||
dhcp_agents_2 = self._list_dhcp_agents_hosting_network(
|
||||
net2['network']['id'])
|
||||
hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
|
||||
num_hosta_nets = len(hosta_nets['networks'])
|
||||
hostc_id = self._get_agent_id(
|
||||
constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTC)
|
||||
hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
|
||||
num_hostc_nets = len(hostc_nets['networks'])
|
||||
|
||||
self.assertEqual(1, num_hosta_nets)
|
||||
self.assertEqual(1, num_hostc_nets)
|
||||
self.assertEqual(1, len(dhcp_agents_1['agents']))
|
||||
self.assertEqual(1, len(dhcp_agents_2['agents']))
|
||||
self.assertEqual(DHCP_HOSTA, dhcp_agents_1['agents'][0]['host'])
|
||||
self.assertEqual(DHCP_HOSTC, dhcp_agents_2['agents'][0]['host'])
|
||||
|
||||
def test_network_scheduling_on_port_creation(self):
|
||||
with self.subnet() as subnet:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
subnet['subnet']['network_id'])
|
||||
result0 = len(dhcp_agents['agents'])
|
||||
self._register_agent_states()
|
||||
with self.port(subnet=subnet,
|
||||
device_owner="compute:test:" + DHCP_HOSTA) as port:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
port['port']['network_id'])
|
||||
result1 = len(dhcp_agents['agents'])
|
||||
self.assertEqual(0, result0)
|
||||
self.assertEqual(1, result1)
|
||||
|
||||
def test_network_scheduler_with_disabled_agent(self):
|
||||
dhcp_hosta = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': DHCP_HOSTA,
|
||||
'topic': 'DHCP_AGENT',
|
||||
'configurations': {'dhcp_driver': 'dhcp_driver',
|
||||
'use_namespaces': True,
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
self._register_one_agent_state(dhcp_hosta)
|
||||
with self.network() as net1:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net1['network']['id'])
|
||||
self.assertEqual(1, len(dhcp_agents['agents']))
|
||||
agents = self._list_agents()
|
||||
self._disable_agent(agents['agents'][0]['id'])
|
||||
with self.network() as net2:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net2['network']['id'])
|
||||
self.assertEqual(0, len(dhcp_agents['agents']))
|
||||
|
||||
def test_network_scheduler_with_down_agent(self):
|
||||
dhcp_hosta = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': DHCP_HOSTA,
|
||||
'topic': 'DHCP_AGENT',
|
||||
'configurations': {'dhcp_driver': 'dhcp_driver',
|
||||
'use_namespaces': True,
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
self._register_one_agent_state(dhcp_hosta)
|
||||
is_agent_down_str = 'quantum.db.agents_db.AgentDbMixin.is_agent_down'
|
||||
with mock.patch(is_agent_down_str) as mock_is_agent_down:
|
||||
mock_is_agent_down.return_value = False
|
||||
with self.network() as net:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net['network']['id'])
|
||||
self.assertEqual(1, len(dhcp_agents['agents']))
|
||||
with mock.patch(is_agent_down_str) as mock_is_agent_down:
|
||||
mock_is_agent_down.return_value = True
|
||||
with self.network() as net:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net['network']['id'])
|
||||
self.assertEqual(0, len(dhcp_agents['agents']))
|
||||
|
||||
def test_network_scheduler_with_hosted_network(self):
|
||||
dhcp_hosta = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': DHCP_HOSTA,
|
||||
'topic': 'DHCP_AGENT',
|
||||
'configurations': {'dhcp_driver': 'dhcp_driver',
|
||||
'use_namespaces': True,
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
self._register_one_agent_state(dhcp_hosta)
|
||||
agents = self._list_agents()
|
||||
with self.network() as net1:
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net1['network']['id'])
|
||||
self.assertEqual(1, len(dhcp_agents['agents']))
|
||||
with mock.patch.object(OVSQuantumPluginV2,
|
||||
'get_dhcp_agents_hosting_networks',
|
||||
autospec=True) as mock_hosting_agents:
|
||||
|
||||
mock_hosting_agents.return_value = agents['agents']
|
||||
with self.network(do_delete=False) as net2:
|
||||
pass
|
||||
dhcp_agents = self._list_dhcp_agents_hosting_network(
|
||||
net2['network']['id'])
|
||||
self.assertEqual(0, len(dhcp_agents['agents']))
|
||||
|
||||
def test_network_policy(self):
|
||||
with self.network() as net1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
self._list_networks_hosted_by_dhcp_agent(
|
||||
hosta_id, expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._add_network_to_dhcp_agent(
|
||||
hosta_id, net1['network']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._add_network_to_dhcp_agent(hosta_id,
|
||||
net1['network']['id'])
|
||||
self._remove_network_from_dhcp_agent(
|
||||
hosta_id, net1['network']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._list_dhcp_agents_hosting_network(
|
||||
net1['network']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
|
||||
def test_network_add_to_dhcp_agent(self):
|
||||
with self.network() as net1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
num_before_add = len(
|
||||
self._list_networks_hosted_by_dhcp_agent(
|
||||
hosta_id)['networks'])
|
||||
self._add_network_to_dhcp_agent(hosta_id,
|
||||
net1['network']['id'])
|
||||
num_after_add = len(
|
||||
self._list_networks_hosted_by_dhcp_agent(
|
||||
hosta_id)['networks'])
|
||||
self.assertEqual(0, num_before_add)
|
||||
self.assertEqual(1, num_after_add)
|
||||
|
||||
def test_network_remove_from_dhcp_agent(self):
|
||||
dhcp_hosta = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': DHCP_HOSTA,
|
||||
'topic': 'DHCP_AGENT',
|
||||
'configurations': {'dhcp_driver': 'dhcp_driver',
|
||||
'use_namespaces': True,
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
self._register_one_agent_state(dhcp_hosta)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
|
||||
DHCP_HOSTA)
|
||||
with self.network() as net1:
|
||||
num_before_remove = len(
|
||||
self._list_networks_hosted_by_dhcp_agent(
|
||||
hosta_id)['networks'])
|
||||
self._remove_network_from_dhcp_agent(hosta_id,
|
||||
net1['network']['id'])
|
||||
num_after_remove = len(
|
||||
self._list_networks_hosted_by_dhcp_agent(
|
||||
hosta_id)['networks'])
|
||||
self.assertEqual(1, num_before_remove)
|
||||
self.assertEqual(0, num_after_remove)
|
||||
|
||||
def test_router_auto_schedule_with_hosted(self):
|
||||
with self.router() as router:
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
self._register_agent_states()
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])
|
||||
self.assertEqual(1, len(l3_agents['agents']))
|
||||
self.assertEqual(L3_HOSTA, l3_agents['agents'][0]['host'])
|
||||
|
||||
def test_router_auto_schedule_with_hosted_2(self):
|
||||
# one agent hosts one router
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_hosta = {
|
||||
'binary': 'quantum-l3-agent',
|
||||
'host': L3_HOSTA,
|
||||
'topic': 'L3_AGENT',
|
||||
'configurations': {'use_namespaces': True,
|
||||
'router_id': None,
|
||||
'handle_internal_only_routers':
|
||||
True,
|
||||
'gateway_external_network_id':
|
||||
None,
|
||||
'interface_driver': 'interface_driver',
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_L3}
|
||||
l3_hostb = copy.deepcopy(l3_hosta)
|
||||
l3_hostb['host'] = L3_HOSTB
|
||||
with self.router() as router1:
|
||||
self._register_one_agent_state(l3_hosta)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._disable_agent(hosta_id, admin_state_up=False)
|
||||
with self.router() as router2:
|
||||
self._register_one_agent_state(l3_hostb)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
router1['router']['id'])
|
||||
l3_agents_2 = self._list_l3_agents_hosting_router(
|
||||
router2['router']['id'])
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
num_hosta_routers = len(hosta_routers['routers'])
|
||||
hostb_id = self._get_agent_id(
|
||||
constants.AGENT_TYPE_L3,
|
||||
L3_HOSTB)
|
||||
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
|
||||
num_hostc_routers = len(hostb_routers['routers'])
|
||||
|
||||
self.assertEqual(1, num_hosta_routers)
|
||||
self.assertEqual(1, num_hostc_routers)
|
||||
self.assertEqual(1, len(l3_agents_1['agents']))
|
||||
self.assertEqual(1, len(l3_agents_2['agents']))
|
||||
self.assertEqual(L3_HOSTA, l3_agents_1['agents'][0]['host'])
|
||||
self.assertEqual(L3_HOSTB, l3_agents_2['agents'][0]['host'])
|
||||
|
||||
def test_router_auto_schedule_with_disabled(self):
|
||||
with contextlib.nested(self.router(),
|
||||
self.router()):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTB)
|
||||
self._disable_agent(hosta_id)
|
||||
# first agent will not host router since it is disabled
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
# second agent will host all the routers since first is disabled.
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
|
||||
num_hostb_routers = len(hostb_routers['routers'])
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
num_hosta_routers = len(hosta_routers['routers'])
|
||||
self.assertEqual(2, num_hostb_routers)
|
||||
self.assertEqual(0, num_hosta_routers)
|
||||
|
||||
def test_router_auto_schedule_with_candidates(self):
|
||||
l3_hosta = {
|
||||
'binary': 'quantum-l3-agent',
|
||||
'host': L3_HOSTA,
|
||||
'topic': 'L3_AGENT',
|
||||
'configurations': {'use_namespaces': False,
|
||||
'router_id': None,
|
||||
'handle_internal_only_routers':
|
||||
True,
|
||||
'gateway_external_network_id':
|
||||
None,
|
||||
'interface_driver': 'interface_driver',
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_L3}
|
||||
with contextlib.nested(self.router(),
|
||||
self.router()) as (router1, router2):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_hosta['configurations']['router_id'] = router1['router']['id']
|
||||
self._register_one_agent_state(l3_hosta)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
num_hosta_routers = len(hosta_routers['routers'])
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
router1['router']['id'])
|
||||
l3_agents_2 = self._list_l3_agents_hosting_router(
|
||||
router2['router']['id'])
|
||||
# L3 agent will host only the compatible router.
|
||||
self.assertEqual(1, num_hosta_routers)
|
||||
self.assertEqual(1, len(l3_agents_1['agents']))
|
||||
self.assertEqual(0, len(l3_agents_2['agents']))
|
||||
|
||||
def test_router_schedule_with_candidates(self):
|
||||
l3_hosta = {
|
||||
'binary': 'quantum-l3-agent',
|
||||
'host': L3_HOSTA,
|
||||
'topic': 'L3_AGENT',
|
||||
'configurations': {'use_namespaces': False,
|
||||
'router_id': None,
|
||||
'handle_internal_only_routers':
|
||||
True,
|
||||
'gateway_external_network_id':
|
||||
None,
|
||||
'interface_driver': 'interface_driver',
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_L3}
|
||||
with contextlib.nested(self.router(),
|
||||
self.router(),
|
||||
self.subnet(),
|
||||
self.subnet(cidr='10.0.3.0/24')) as (router1,
|
||||
router2,
|
||||
subnet1,
|
||||
subnet2):
|
||||
l3_hosta['configurations']['router_id'] = router1['router']['id']
|
||||
self._register_one_agent_state(l3_hosta)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._router_interface_action('add',
|
||||
router1['router']['id'],
|
||||
subnet1['subnet']['id'],
|
||||
None)
|
||||
self._router_interface_action('add',
|
||||
router2['router']['id'],
|
||||
subnet2['subnet']['id'],
|
||||
None)
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
num_hosta_routers = len(hosta_routers['routers'])
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
router1['router']['id'])
|
||||
l3_agents_2 = self._list_l3_agents_hosting_router(
|
||||
router2['router']['id'])
|
||||
# L3 agent will host only the compatible router.
|
||||
self.assertEqual(1, num_hosta_routers)
|
||||
self.assertEqual(1, len(l3_agents_1['agents']))
|
||||
self.assertEqual(0, len(l3_agents_2['agents']))
|
||||
|
||||
def test_router_without_l3_agents(self):
|
||||
with self.subnet() as s:
|
||||
self._set_net_external(s['subnet']['network_id'])
|
||||
data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
|
||||
data['router']['name'] = 'router1'
|
||||
data['router']['external_gateway_info'] = {
|
||||
'network_id': s['subnet']['network_id']}
|
||||
router_req = self.new_create_request('routers', data, self.fmt)
|
||||
res = router_req.get_response(self.ext_api)
|
||||
router = self.deserialize(self.fmt, res)
|
||||
l3agents = (
|
||||
self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers(
|
||||
self.adminContext, [router['router']['id']]))
|
||||
self._delete('routers', router['router']['id'])
|
||||
self.assertEqual(0, len(l3agents))
|
||||
|
||||
def test_router_sync_data(self):
|
||||
with contextlib.nested(self.subnet(),
|
||||
self.subnet(cidr='10.0.2.0/24'),
|
||||
self.subnet(cidr='10.0.3.0/24')) as (
|
||||
s1, s2, s3):
|
||||
self._register_agent_states()
|
||||
self._set_net_external(s1['subnet']['network_id'])
|
||||
data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
|
||||
data['router']['name'] = 'router1'
|
||||
data['router']['external_gateway_info'] = {
|
||||
'network_id': s1['subnet']['network_id']}
|
||||
router_req = self.new_create_request('routers', data, self.fmt)
|
||||
res = router_req.get_response(self.ext_api)
|
||||
router = self.deserialize(self.fmt, res)
|
||||
self._router_interface_action('add',
|
||||
router['router']['id'],
|
||||
s2['subnet']['id'],
|
||||
None)
|
||||
self._router_interface_action('add',
|
||||
router['router']['id'],
|
||||
s3['subnet']['id'],
|
||||
None)
|
||||
l3agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])
|
||||
self.assertEqual(1, len(l3agents['agents']))
|
||||
agents = self._list_agents()
|
||||
another_l3_agent_id = None
|
||||
another_l3_agent_host = None
|
||||
default = l3agents['agents'][0]['id']
|
||||
for com in agents['agents']:
|
||||
if (com['id'] != default and
|
||||
com['agent_type'] == constants.AGENT_TYPE_L3):
|
||||
another_l3_agent_id = com['id']
|
||||
another_l3_agent_host = com['host']
|
||||
break
|
||||
self.assertTrue(another_l3_agent_id is not None)
|
||||
self._add_router_to_l3_agent(another_l3_agent_id,
|
||||
router['router']['id'],
|
||||
expected_code=exc.HTTPConflict.code)
|
||||
self._remove_router_from_l3_agent(default,
|
||||
router['router']['id'])
|
||||
self._add_router_to_l3_agent(another_l3_agent_id,
|
||||
router['router']['id'])
|
||||
l3agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])
|
||||
self.assertEqual(another_l3_agent_host,
|
||||
l3agents['agents'][0]['host'])
|
||||
self._remove_router_from_l3_agent(another_l3_agent_id,
|
||||
router['router']['id'])
|
||||
self._router_interface_action('remove',
|
||||
router['router']['id'],
|
||||
s2['subnet']['id'],
|
||||
None)
|
||||
l3agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])
|
||||
self.assertEqual(1,
|
||||
len(l3agents['agents']))
|
||||
self._router_interface_action('remove',
|
||||
router['router']['id'],
|
||||
s3['subnet']['id'],
|
||||
None)
|
||||
self._delete('routers', router['router']['id'])
|
||||
|
||||
def test_router_add_to_l3_agent(self):
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
num_before_add = len(
|
||||
self._list_routers_hosted_by_l3_agent(
|
||||
hosta_id)['routers'])
|
||||
self._add_router_to_l3_agent(hosta_id,
|
||||
router1['router']['id'])
|
||||
hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTB)
|
||||
self._add_router_to_l3_agent(hostb_id,
|
||||
router1['router']['id'],
|
||||
expected_code=exc.HTTPConflict.code)
|
||||
num_after_add = len(
|
||||
self._list_routers_hosted_by_l3_agent(
|
||||
hosta_id)['routers'])
|
||||
self.assertEqual(0, num_before_add)
|
||||
self.assertEqual(1, num_after_add)
|
||||
|
||||
def test_router_policy(self):
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._list_routers_hosted_by_l3_agent(
|
||||
hosta_id, expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._add_router_to_l3_agent(
|
||||
hosta_id, router1['router']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._add_router_to_l3_agent(
|
||||
hosta_id, router1['router']['id'])
|
||||
self._remove_router_from_l3_agent(
|
||||
hosta_id, router1['router']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._list_l3_agents_hosting_router(
|
||||
router1['router']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
|
||||
|
||||
class L3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
|
||||
test_agent_ext_plugin.AgentDBTestMixIn,
|
||||
AgentSchedulerTestMixIn,
|
||||
test_plugin.QuantumDbPluginV2TestCase):
|
||||
def setUp(self):
|
||||
plugin = ('quantum.plugins.openvswitch.'
|
||||
'ovs_quantum_plugin.OVSQuantumPluginV2')
|
||||
self.dhcp_notifier_cls_p = mock.patch(
|
||||
'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
|
||||
'DhcpAgentNotifyAPI')
|
||||
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
|
||||
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
|
||||
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
|
||||
super(L3AgentNotifierTestCase, self).setUp(plugin)
|
||||
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
|
||||
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
||||
self.adminContext = context.get_admin_context()
|
||||
self.addCleanup(self.dhcp_notifier_cls_p.stop)
|
||||
|
||||
def test_router_add_to_l3_agent_notification(self):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._add_router_to_l3_agent(hosta_id,
|
||||
router1['router']['id'])
|
||||
routers = plugin.get_sync_data(self.adminContext,
|
||||
[router1['router']['id']])
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY,
|
||||
plugin.l3_agent_notifier.make_msg(
|
||||
'router_added_to_agent',
|
||||
payload=routers),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
def test_router_remove_from_l3_agent_notification(self):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._add_router_to_l3_agent(hosta_id,
|
||||
router1['router']['id'])
|
||||
self._remove_router_from_l3_agent(hosta_id,
|
||||
router1['router']['id'])
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY, plugin.l3_agent_notifier.make_msg(
|
||||
'router_removed_from_agent',
|
||||
payload={'router_id': router1['router']['id']}),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
def test_agent_updated_l3_agent_notification(self):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._disable_agent(hosta_id, admin_state_up=False)
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY, plugin.l3_agent_notifier.make_msg(
|
||||
'agent_updated',
|
||||
payload={'admin_state_up': False}),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
|
||||
class AgentSchedulerTestCaseXML(AgentSchedulerTestCase):
|
||||
fmt = 'xml'
|
@ -62,28 +62,17 @@ class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
supported_extension_aliases = ["agent"]
|
||||
|
||||
|
||||
class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
fmt = 'json'
|
||||
|
||||
def setUp(self):
|
||||
self.adminContext = context.get_admin_context()
|
||||
test_config['plugin_name_v2'] = (
|
||||
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
|
||||
# for these tests we need to enable overlapping ips
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
ext_mgr = AgentTestExtensionManager()
|
||||
test_config['extension_manager'] = ext_mgr
|
||||
super(AgentDBTestCase, self).setUp()
|
||||
class AgentDBTestMixIn(object):
|
||||
|
||||
def _list_agents(self, expected_res_status=None,
|
||||
quantum_context=None,
|
||||
query_string=None):
|
||||
comp_res = self._list('agents',
|
||||
quantum_context=quantum_context,
|
||||
query_params=query_string)
|
||||
agent_res = self._list('agents',
|
||||
quantum_context=quantum_context,
|
||||
query_params=query_string)
|
||||
if expected_res_status:
|
||||
self.assertEqual(comp_res.status_int, expected_res_status)
|
||||
return comp_res
|
||||
self.assertEqual(agent_res.status_int, expected_res_status)
|
||||
return agent_res
|
||||
|
||||
def _register_agent_states(self):
|
||||
"""Register two L3 agents and two DHCP agents."""
|
||||
@ -123,6 +112,21 @@ class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
agent_state={'agent_state': dhcp_hostc})
|
||||
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
|
||||
|
||||
|
||||
class AgentDBTestCase(AgentDBTestMixIn,
|
||||
test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
fmt = 'json'
|
||||
|
||||
def setUp(self):
|
||||
self.adminContext = context.get_admin_context()
|
||||
test_config['plugin_name_v2'] = (
|
||||
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
|
||||
# for these tests we need to enable overlapping ips
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
ext_mgr = AgentTestExtensionManager()
|
||||
test_config['extension_manager'] = ext_mgr
|
||||
super(AgentDBTestCase, self).setUp()
|
||||
|
||||
def test_create_agent(self):
|
||||
data = {'agent': {}}
|
||||
_req = self.new_create_request('agents', data, self.fmt)
|
||||
|
@ -25,7 +25,7 @@ class TestDhcpRpcCallackMixin(testtools.TestCase):
|
||||
super(TestDhcpRpcCallackMixin, self).setUp()
|
||||
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
|
||||
get_plugin = self.plugin_p.start()
|
||||
self.plugin = mock.Mock()
|
||||
self.plugin = mock.MagicMock()
|
||||
get_plugin.return_value = self.plugin
|
||||
self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin()
|
||||
self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG')
|
||||
|
@ -88,7 +88,7 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
def testRouterInfoCreate(self):
|
||||
id = _uuid()
|
||||
ri = l3_agent.RouterInfo(id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces, None)
|
||||
|
||||
self.assertTrue(ri.ns_name().endswith(id))
|
||||
|
||||
@ -100,7 +100,7 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
router_id = _uuid()
|
||||
network_id = _uuid()
|
||||
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces, None)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
cidr = '99.0.1.9/24'
|
||||
mac = 'ca:fe:de:ad:be:ef'
|
||||
@ -128,7 +128,7 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
def _test_external_gateway_action(self, action):
|
||||
router_id = _uuid()
|
||||
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces, None)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
|
||||
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
|
||||
@ -172,7 +172,7 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
def _test_floating_ip_action(self, action):
|
||||
router_id = _uuid()
|
||||
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces, None)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
floating_ip = '20.0.0.100'
|
||||
fixed_ip = '10.0.0.23'
|
||||
@ -227,7 +227,8 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
|
||||
router_id = _uuid()
|
||||
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces,
|
||||
None)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
|
||||
fake_route1 = {'destination': '135.207.0.0/16',
|
||||
@ -274,7 +275,8 @@ class TestBasicRouterOperations(testtools.TestCase):
|
||||
router_id = _uuid()
|
||||
|
||||
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
|
||||
self.conf.use_namespaces)
|
||||
self.conf.use_namespaces,
|
||||
None)
|
||||
ri.router = {}
|
||||
|
||||
fake_old_routes = []
|
||||
|
@ -29,6 +29,7 @@ from webob import exc
|
||||
import webtest
|
||||
|
||||
from quantum.api import extensions
|
||||
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.common import config
|
||||
from quantum.common import constants as l3_constants
|
||||
@ -37,7 +38,6 @@ from quantum.common.test_lib import test_config
|
||||
from quantum import context
|
||||
from quantum.db import db_base_plugin_v2
|
||||
from quantum.db import l3_db
|
||||
from quantum.db import l3_rpc_agent_api
|
||||
from quantum.db import models_v2
|
||||
from quantum.extensions import l3
|
||||
from quantum.manager import QuantumManager
|
||||
@ -307,24 +307,7 @@ class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
return super(TestL3NatPlugin, self).delete_port(context, id)
|
||||
|
||||
|
||||
class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
|
||||
def setUp(self):
|
||||
test_config['plugin_name_v2'] = (
|
||||
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
|
||||
# for these tests we need to enable overlapping ips
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
ext_mgr = L3TestExtensionManager()
|
||||
test_config['extension_manager'] = ext_mgr
|
||||
super(L3NatTestCaseBase, self).setUp()
|
||||
|
||||
# Set to None to reload the drivers
|
||||
notifier_api._drivers = None
|
||||
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
|
||||
|
||||
def tearDown(self):
|
||||
test_notifier.NOTIFICATIONS = []
|
||||
super(L3NatTestCaseBase, self).tearDown()
|
||||
class L3NatTestCaseMixin(object):
|
||||
|
||||
def _create_network(self, fmt, name, admin_state_up, **kwargs):
|
||||
""" Override the routine for allowing the router:external attribute """
|
||||
@ -334,7 +317,7 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
kwargs),
|
||||
kwargs.values()))
|
||||
arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,)
|
||||
return super(L3NatTestCaseBase, self)._create_network(
|
||||
return super(L3NatTestCaseMixin, self)._create_network(
|
||||
fmt, name, admin_state_up, arg_list=arg_list, **new_args)
|
||||
|
||||
def _create_router(self, fmt, tenant_id, name=None,
|
||||
@ -505,6 +488,27 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
public_sub['subnet']['network_id'])
|
||||
|
||||
|
||||
class L3NatTestCaseBase(L3NatTestCaseMixin,
|
||||
test_db_plugin.QuantumDbPluginV2TestCase):
|
||||
|
||||
def setUp(self):
|
||||
test_config['plugin_name_v2'] = (
|
||||
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
|
||||
# for these tests we need to enable overlapping ips
|
||||
cfg.CONF.set_default('allow_overlapping_ips', True)
|
||||
ext_mgr = L3TestExtensionManager()
|
||||
test_config['extension_manager'] = ext_mgr
|
||||
super(L3NatTestCaseBase, self).setUp()
|
||||
|
||||
# Set to None to reload the drivers
|
||||
notifier_api._drivers = None
|
||||
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
|
||||
|
||||
def tearDown(self):
|
||||
test_notifier.NOTIFICATIONS = []
|
||||
super(L3NatTestCaseBase, self).tearDown()
|
||||
|
||||
|
||||
class L3NatDBTestCase(L3NatTestCaseBase):
|
||||
|
||||
def test_router_create(self):
|
||||
@ -1459,7 +1463,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
|
||||
|
||||
def _test_notify_op_agent(self, target_func, *args):
|
||||
l3_rpc_agent_api_str = (
|
||||
'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI')
|
||||
'quantum.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
|
||||
oldNotify = l3_rpc_agent_api.L3AgentNotify
|
||||
try:
|
||||
with mock.patch(l3_rpc_agent_api_str) as notifyApi:
|
||||
|
6
tox.ini
6
tox.ini
@ -19,8 +19,12 @@ sitepackages = True
|
||||
downloadcache = ~/cache/pip
|
||||
|
||||
[testenv:pep8]
|
||||
# E712 comparison to False should be 'if cond is False:' or 'if not cond:'
|
||||
# query = query.filter(Component.disabled == False)
|
||||
# E125 continuation line does not distinguish itself from next logical line
|
||||
|
||||
commands =
|
||||
pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,openstack,*egg .
|
||||
pep8 --repeat --show-source --ignore=E125,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg .
|
||||
pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin
|
||||
|
||||
[testenv:i18n]
|
||||
|
Loading…
x
Reference in New Issue
Block a user