diff --git a/etc/policy.json b/etc/policy.json index a427507f5b..aa18ba6e10 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -58,5 +58,14 @@ "update_agent": "rule:admin_only", "delete_agent": "rule:admin_only", "get_agent": "rule:admin_only", - "get_agents": "rule:admin_only" + "get_agents": "rule:admin_only", + + "create_dhcp-network": "rule:admin_only", + "delete_dhcp-network": "rule:admin_only", + "get_dhcp-networks": "rule:admin_only", + "create_l3-router": "rule:admin_only", + "delete_l3-router": "rule:admin_only", + "get_l3-routers": "rule:admin_only", + "get_dhcp-agents": "rule:admin_only", + "get_l3-agents": "rule:admin_only" } diff --git a/etc/quantum.conf b/etc/quantum.conf index d14aac9ff9..89005b6363 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -198,6 +198,27 @@ notification_topics = notifications # Maximum number of fixed ips per port # max_fixed_ips_per_port = 5 +# =========== items for agent management extension ============= +# Seconds to regard the agent as down. +# agent_down_time = 5 +# =========== end of items for agent management extension ===== + +# =========== items for agent scheduler extension ============= +# Driver to use for scheduling network to DHCP agent +# network_scheduler_driver = quantum.scheduler.dhcp_agent_scheduler.ChanceScheduler +# Driver to use for scheduling router to a default L3 agent +# router_scheduler_driver = quantum.scheduler.l3_agent_scheduler.ChanceScheduler + +# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted +# networks to first DHCP agent which sends get_active_networks message to +# quantum server +# network_auto_schedule = True + +# Allow auto scheduling routers to L3 agent. It will schedule non-hosted +# routers to first L3 agent which sends sync_routers message to quantum server +# router_auto_schedule = True +# =========== end of items for agent scheduler extension ===== + [QUOTAS] # resource name(s) that are supported in quota features # quota_items = network,subnet,port @@ -217,11 +238,6 @@ notification_topics = notifications # default driver to use for quota checks # quota_driver = quantum.quota.ConfDriver -# =========== items for agent management extension ============= -# Seconds to regard the agent as down. -# agent_down_time = 5 -# =========== end of items for agent management extension ===== - [DEFAULT_SERVICETYPE] # Description of the default service type (optional) # description = "default service type" diff --git a/quantum/agent/dhcp_agent.py b/quantum/agent/dhcp_agent.py index 532647b9e3..917626468c 100644 --- a/quantum/agent/dhcp_agent.py +++ b/quantum/agent/dhcp_agent.py @@ -320,7 +320,7 @@ class DhcpPluginApi(proxy.RpcProxy): super(DhcpPluginApi, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) self.context = context - self.host = socket.gethostname() + self.host = cfg.CONF.host def get_active_networks(self): """Make a remote process call to retrieve the active networks.""" @@ -685,6 +685,11 @@ class DhcpAgentWithStateReport(DhcpAgent): if self.agent_state.pop('start_flag', None): self.run() + def agent_updated(self, context, payload): + """Handle the agent_updated notification event.""" + self.needs_resync = True + LOG.info(_("agent_updated by server side %s!"), payload) + def after_start(self): LOG.info(_("DHCP agent started")) diff --git a/quantum/agent/l3_agent.py b/quantum/agent/l3_agent.py index 06076db28e..c67ebe6a19 100644 --- a/quantum/agent/l3_agent.py +++ b/quantum/agent/l3_agent.py @@ -93,7 +93,7 @@ class L3PluginApi(proxy.RpcProxy): class RouterInfo(object): - def __init__(self, router_id, root_helper, use_namespaces, router=None): + def __init__(self, router_id, root_helper, use_namespaces, router): self.router_id = router_id self.ex_gw_port = None self.internal_ports = [] @@ -227,7 +227,7 @@ class L3NATAgent(manager.Manager): else: raise - def _router_added(self, router_id, router=None): + def _router_added(self, router_id, router): ri = RouterInfo(router_id, self.root_helper, self.conf.use_namespaces, router) self.router_info[router_id] = ri @@ -242,6 +242,10 @@ class L3NATAgent(manager.Manager): def _router_removed(self, router_id): ri = self.router_info[router_id] + ri.router['gw_port'] = None + ri.router[l3_constants.INTERFACE_KEY] = [] + ri.router[l3_constants.FLOATINGIP_KEY] = [] + self.process_router(ri) for c, r in self.metadata_filter_rules(): ri.iptables_manager.ipv4['filter'].remove_rule(c, r) for c, r in self.metadata_nat_rules(): @@ -568,7 +572,13 @@ class L3NATAgent(manager.Manager): LOG.debug(msg) self.fullsync = True - def _process_routers(self, routers): + def router_removed_from_agent(self, context, payload): + self.router_deleted(context, payload['router_id']) + + def router_added_to_agent(self, context, payload): + self.routers_updated(context, payload) + + def _process_routers(self, routers, all_routers=False): if (self.conf.external_network_bridge and not ip_lib.device_exists(self.conf.external_network_bridge)): LOG.error(_("The external network bridge '%s' does not exist"), @@ -576,7 +586,17 @@ class L3NATAgent(manager.Manager): return target_ex_net_id = self._fetch_external_net_id() - + # if routers are all the routers we have (They are from router sync on + # starting or when error occurs during running), we seek the + # routers which should be removed. + # If routers are from server side notification, we seek them + # from subset of incoming routers and ones we have now. + if all_routers: + prev_router_ids = set(self.router_info) + else: + prev_router_ids = set(self.router_info) & set( + [router['id'] for router in routers]) + cur_router_ids = set() for r in routers: if not r['admin_state_up']: continue @@ -593,13 +613,15 @@ class L3NATAgent(manager.Manager): if ex_net_id and ex_net_id != target_ex_net_id: continue - + cur_router_ids.add(r['id']) if r['id'] not in self.router_info: - self._router_added(r['id']) - + self._router_added(r['id'], r) ri = self.router_info[r['id']] ri.router = r self.process_router(ri) + # identify and remove routers that no longer exist + for router_id in prev_router_ids - cur_router_ids: + self._router_removed(router_id) @periodic_task.periodic_task def _sync_routers_task(self, context): @@ -613,8 +635,7 @@ class L3NATAgent(manager.Manager): router_id = None routers = self.plugin_rpc.get_routers( context, router_id) - self.router_info = {} - self._process_routers(routers) + self._process_routers(routers, all_routers=True) self.fullsync = False except Exception: LOG.exception(_("Failed synchronizing routers")) @@ -704,6 +725,11 @@ class L3NATAgentWithStateReport(L3NATAgent): except Exception: LOG.exception(_("Failed reporting state!")) + def agent_updated(self, context, payload): + """Handle the agent_updated notification event.""" + self.fullsync = True + LOG.info(_("agent_updated by server side %s!"), payload) + def main(): eventlet.monkey_patch() diff --git a/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 5cf03694f3..2a01ad18e8 100644 --- a/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from quantum.common import constants from quantum.common import topics +from quantum.common import utils +from quantum import manager from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import proxy @@ -40,11 +43,35 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): super(DhcpAgentNotifyAPI, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) - def _notification(self, context, method, payload): + def _get_dhcp_agents(self, context, network_id): + plugin = manager.QuantumManager.get_plugin() + dhcp_agents = plugin.get_dhcp_agents_hosting_networks( + context, [network_id], active=True) + return [(dhcp_agent.host, dhcp_agent.topic) for + dhcp_agent in dhcp_agents] + + def _notification_host(self, context, method, payload, host): + """Notify the agent on host""" + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topics.DHCP_AGENT, host)) + + def _notification(self, context, method, payload, network_id): """Notify all the agents that are hosting the network""" - # By now, we have no scheduling feature, so we fanout - # to all of the DHCP agents - self._notification_fanout(context, method, payload) + plugin = manager.QuantumManager.get_plugin() + if (method != 'network_delete_end' and utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)): + for (host, topic) in self._get_dhcp_agents(context, network_id): + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topic, host)) + else: + # besides the non-agentscheduler plugin, + # There is no way to query who is hosting the network + # when the network is deleted, so we need to fanout + self._notification_fanout(context, method, payload) def _notification_fanout(self, context, method, payload): """Fanout the payload to all dhcp agents""" @@ -53,6 +80,19 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): payload=payload), topic=topics.DHCP_AGENT) + def network_removed_from_agent(self, context, network_id, host): + self._notification_host(context, 'network_delete_end', + {'network_id': network_id}, host) + + def network_added_to_agent(self, context, network_id, host): + self._notification_host(context, 'network_create_end', + {'network': {'id': network_id}}, host) + + def agent_updated(self, context, admin_state_up, host): + self._notification_host(context, 'agent_updated', + {'admin_state_up': admin_state_up}, + host) + def notify(self, context, data, methodname): # data is {'key' : 'value'} with only one key if methodname not in self.VALID_METHOD_NAMES: @@ -61,10 +101,18 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): if obj_type not in self.VALID_RESOURCES: return obj_value = data[obj_type] + network_id = None + if obj_type == 'network' and 'id' in obj_value: + network_id = obj_value['id'] + elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value: + network_id = obj_value['network_id'] + if not network_id: + return methodname = methodname.replace(".", "_") if methodname.endswith("_delete_end"): if 'id' in obj_value: self._notification(context, methodname, - {obj_type + '_id': obj_value['id']}) + {obj_type + '_id': obj_value['id']}, + network_id) else: - self._notification(context, methodname, data) + self._notification(context, methodname, data, network_id) diff --git a/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py new file mode 100644 index 0000000000..c5085cb61a --- /dev/null +++ b/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -0,0 +1,120 @@ +# Copyright (c) 2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from quantum.common import constants +from quantum.common import topics +from quantum.common import utils +from quantum import manager +from quantum.openstack.common import log as logging +from quantum.openstack.common.rpc import proxy + + +LOG = logging.getLogger(__name__) + + +class L3AgentNotifyAPI(proxy.RpcProxy): + """API for plugin to notify L3 agent.""" + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic=topics.L3_AGENT): + super(L3AgentNotifyAPI, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def _notification_host(self, context, method, payload, host): + """Notify the agent that is hosting the router""" + LOG.debug(_('Nofity agent at %(host)s the message ' + '%(method)s'), {'host': host, + 'method': method}) + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topics.L3_AGENT, host)) + + def _agent_notification(self, context, method, routers, + operation, data): + """Notify changed routers to hosting l3 agents. + + Adjust routers according to l3 agents' role and + related dhcp agents. + Notify dhcp agent to get right subnet's gateway ips. + """ + adminContext = context.is_admin and context or context.elevated() + plugin = manager.QuantumManager.get_plugin() + for router in routers: + l3_agents = plugin.get_l3_agents_hosting_routers( + adminContext, [router['id']], + admin_state_up=True, + active=True) + for l3_agent in l3_agents: + LOG.debug(_('Notify agent at %(topic)s.%(host)s the message ' + '%(method)s'), + {'topic': l3_agent.topic, + 'host': l3_agent.host, + 'method': method}) + self.cast( + context, self.make_msg(method, + routers=[router]), + topic='%s.%s' % (l3_agent.topic, l3_agent.host)) + + def _notification(self, context, method, routers, operation, data): + """Notify all the agents that are hosting the routers""" + plugin = manager.QuantumManager.get_plugin() + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + adminContext = (context.is_admin and + context or context.elevated()) + plugin.schedule_routers(adminContext, routers) + self._agent_notification( + context, method, routers, operation, data) + else: + self.fanout_cast( + context, self.make_msg(method, + routers=routers), + topic=topics.L3_AGENT) + + def _notification_fanout(self, context, method, router_id): + """Fanout the deleted router to all L3 agents""" + LOG.debug(_('Fanout notify agent at %(topic)s the message ' + '%(method)s on router %(router_id)s'), + {'topic': topics.DHCP_AGENT, + 'method': method, + 'router_id': router_id}) + self.fanout_cast( + context, self.make_msg(method, + router_id=router_id), + topic=topics.L3_AGENT) + + def agent_updated(self, context, admin_state_up, host): + self._notification_host(context, 'agent_updated', + {'admin_state_up': admin_state_up}, + host) + + def router_deleted(self, context, router_id): + self._notification_fanout(context, 'router_deleted', router_id) + + def routers_updated(self, context, routers, operation=None, data=None): + if routers: + self._notification(context, 'routers_updated', routers, + operation, data) + + def router_removed_from_agent(self, context, router_id, host): + self._notification_host(context, 'router_removed_from_agent', + {'router_id': router_id}, host) + + def router_added_to_agent(self, context, routers, host): + self._notification_host(context, 'router_added_to_agent', + routers, host) + +L3AgentNotify = L3AgentNotifyAPI() diff --git a/quantum/common/constants.py b/quantum/common/constants.py index 3e27fdf145..e1bc25f18c 100644 --- a/quantum/common/constants.py +++ b/quantum/common/constants.py @@ -66,3 +66,5 @@ PAGINATION_INFINITE = 'infinite' SORT_DIRECTION_ASC = 'asc' SORT_DIRECTION_DESC = 'desc' + +AGENT_SCHEDULER_EXT_ALIAS = 'agent_scheduler' diff --git a/quantum/common/utils.py b/quantum/common/utils.py index d6b7c6e257..5522e9a7ff 100644 --- a/quantum/common/utils.py +++ b/quantum/common/utils.py @@ -183,3 +183,8 @@ def diff_list_of_dict(old_list, new_list): added = new_set - old_set removed = old_set - new_set return [str2dict(a) for a in added], [str2dict(r) for r in removed] + + +def is_extension_supported(plugin, ext_alias): + return ext_alias in getattr( + plugin, "supported_extension_aliases", []) diff --git a/quantum/db/agents_db.py b/quantum/db/agents_db.py index 70c56780e1..0f9a236ad2 100644 --- a/quantum/db/agents_db.py +++ b/quantum/db/agents_db.py @@ -67,24 +67,30 @@ class AgentDbMixin(ext_agent.AgentPluginBase): raise ext_agent.AgentNotFound(id=id) return agent - def _is_agent_down(self, heart_beat_time_str): - return timeutils.is_older_than(heart_beat_time_str, + @classmethod + def is_agent_down(cls, heart_beat_time): + return timeutils.is_older_than(heart_beat_time, cfg.CONF.agent_down_time) + def get_configuration_dict(self, agent_db): + try: + conf = jsonutils.loads(agent_db.configurations) + except Exception: + msg = _('Configuration for agent %(agent_type)s on host %(host)s' + ' is invalid.') + LOG.warn(msg, {'agent_type': agent_db.agent_type, + 'host': agent_db.host}) + conf = {} + return conf + def _make_agent_dict(self, agent, fields=None): attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get( ext_agent.RESOURCE_NAME + 's') res = dict((k, agent[k]) for k in attr if k not in ['alive', 'configurations']) - res['alive'] = not self._is_agent_down(res['heartbeat_timestamp']) - try: - res['configurations'] = jsonutils.loads(agent['configurations']) - except Exception: - msg = _('Configurations for agent %(agent_type)s on host %(host)s' - ' are invalid.') - LOG.warn(msg, {'agent_type': res['agent_type'], - 'host': res['host']}) - res['configurations'] = {} + res['alive'] = not AgentDbMixin.is_agent_down( + res['heartbeat_timestamp']) + res['configurations'] = self.get_configuration_dict(agent) return self._fields(res, fields) def delete_agent(self, context, id): @@ -99,6 +105,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase): agent.update(agent_data) return self._make_agent_dict(agent) + def get_agents_db(self, context, filters=None): + query = self._get_collection_query(context, Agent, filters=filters) + return query.all() + def get_agents(self, context, filters=None, fields=None): return self._get_collection(context, Agent, self._make_agent_dict, diff --git a/quantum/db/agentschedulers_db.py b/quantum/db/agentschedulers_db.py new file mode 100644 index 0000000000..5137820f85 --- /dev/null +++ b/quantum/db/agentschedulers_db.py @@ -0,0 +1,363 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import exc +from sqlalchemy.orm import joinedload + +from quantum.api.v2 import attributes +from quantum.common import constants +from quantum.db import agents_db +from quantum.db import model_base +from quantum.db import models_v2 +from quantum.extensions import agentscheduler +from quantum.openstack.common import log as logging +from quantum.openstack.common import uuidutils + + +LOG = logging.getLogger(__name__) + + +class NetworkDhcpAgentBinding(model_base.BASEV2): + """Represents binding between quantum networks and DHCP agents""" + network_id = sa.Column(sa.String(36), + sa.ForeignKey("networks.id", ondelete='CASCADE'), + primary_key=True) + dhcp_agent = orm.relation(agents_db.Agent) + dhcp_agent_id = sa.Column(sa.String(36), + sa.ForeignKey("agents.id", + ondelete='CASCADE'), + primary_key=True) + + +class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId): + """Represents binding between quantum routers and L3 agents""" + router_id = sa.Column(sa.String(36), + sa.ForeignKey("routers.id", ondelete='CASCADE')) + l3_agent = orm.relation(agents_db.Agent) + l3_agent_id = sa.Column(sa.String(36), + sa.ForeignKey("agents.id", + ondelete='CASCADE')) + + +class AgentSchedulerDbMixin(agentscheduler.AgentSchedulerPluginBase): + """Mixin class to add agent scheduler extension to db_plugin_base_v2.""" + + dhcp_agent_notifier = None + l3_agent_notifier = None + network_scheduler = None + router_scheduler = None + + def get_dhcp_agents_hosting_networks( + self, context, network_ids, active=None): + if not network_ids: + return [] + query = context.session.query(NetworkDhcpAgentBinding) + query = query.options(joinedload('dhcp_agent')) + if len(network_ids) == 1: + query = query.filter( + NetworkDhcpAgentBinding.network_id == network_ids[0]) + elif network_ids: + query = query.filter( + NetworkDhcpAgentBinding.network_id in network_ids) + if active is not None: + query = (query.filter(agents_db.Agent.admin_state_up == active)) + dhcp_agents = [binding.dhcp_agent for binding in query.all()] + if active is not None: + dhcp_agents = [dhcp_agent for dhcp_agent in + dhcp_agents if not + agents_db.AgentDbMixin.is_agent_down( + dhcp_agent['heartbeat_timestamp'])] + return dhcp_agents + + def add_network_to_dhcp_agent(self, context, id, network_id): + self._get_network(context, network_id) + with context.session.begin(subtransactions=True): + agent_db = self._get_agent(context, id) + if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or + not agent_db['admin_state_up']): + raise agentscheduler.InvalidDHCPAgent(id=id) + dhcp_agents = self.get_dhcp_agents_hosting_networks( + context, [network_id]) + for dhcp_agent in dhcp_agents: + if id == dhcp_agent.id: + raise agentscheduler.NetworkHostedByDHCPAgent( + network_id=network_id, agent_id=id) + binding = NetworkDhcpAgentBinding() + binding.dhcp_agent_id = id + binding.network_id = network_id + context.session.add(binding) + if self.dhcp_agent_notifier: + self.dhcp_agent_notifier.network_added_to_agent( + context, network_id, agent_db.host) + + def remove_network_from_dhcp_agent(self, context, id, network_id): + agent = self._get_agent(context, id) + with context.session.begin(subtransactions=True): + try: + query = context.session.query(NetworkDhcpAgentBinding) + binding = query.filter( + NetworkDhcpAgentBinding.network_id == network_id, + NetworkDhcpAgentBinding.dhcp_agent_id == id).one() + except exc.NoResultFound: + raise agentscheduler.NetworkNotHostedByDhcpAgent( + network_id=network_id, agent_id=id) + context.session.delete(binding) + if self.dhcp_agent_notifier: + self.dhcp_agent_notifier.network_removed_from_agent( + context, network_id, agent.host) + + def list_networks_on_dhcp_agent(self, context, id): + query = context.session.query(NetworkDhcpAgentBinding.network_id) + net_ids = query.filter( + NetworkDhcpAgentBinding.dhcp_agent_id == id).all() + if net_ids: + _ids = [item[0] for item in net_ids] + return {'networks': + self.get_networks(context, filters={'id': _ids})} + else: + return {'networks': []} + + def list_active_networks_on_active_dhcp_agent(self, context, host): + agent = self._get_agent_by_type_and_host( + context, constants.AGENT_TYPE_DHCP, host) + if not agent.admin_state_up: + return [] + query = context.session.query(NetworkDhcpAgentBinding.network_id) + net_ids = query.filter( + NetworkDhcpAgentBinding.dhcp_agent_id == agent.id).all() + if net_ids: + _ids = [item[0] for item in net_ids] + return self.get_networks( + context, filters={'id': _ids, 'admin_state_up': [True]}) + else: + return [] + + def list_dhcp_agents_hosting_network(self, context, network_id): + dhcp_agents = self.get_dhcp_agents_hosting_networks( + context, [network_id]) + agent_ids = [dhcp_agent.id for dhcp_agent in dhcp_agents] + if agent_ids: + return { + 'agents': + self.get_agents(context, filters={'id': agent_ids})} + else: + return {'agents': []} + + def add_router_to_l3_agent(self, context, id, router_id): + """Add a l3 agent to host a router. + """ + router = self.get_router(context, router_id) + with context.session.begin(subtransactions=True): + agent_db = self._get_agent(context, id) + if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or + not agent_db['admin_state_up'] or + not self.get_l3_agent_candidates(router, [agent_db])): + raise agentscheduler.InvalidL3Agent(id=id) + query = context.session.query(RouterL3AgentBinding) + try: + binding = query.filter( + RouterL3AgentBinding.l3_agent_id == agent_db.id, + RouterL3AgentBinding.router_id == router_id).one() + if binding: + raise agentscheduler.RouterHostedByL3Agent( + router_id=router_id, agent_id=id) + except exc.NoResultFound: + pass + + result = self.auto_schedule_routers(context, + agent_db.host, + router_id) + if not result: + raise agentscheduler.RouterSchedulingFailed( + router_id=router_id, agent_id=id) + + if self.l3_agent_notifier: + routers = self.get_sync_data(context, [router_id]) + self.l3_agent_notifier.router_added_to_agent( + context, routers, agent_db.host) + + def remove_router_from_l3_agent(self, context, id, router_id): + """Remove the router from l3 agent. After it, the router + will be non-hosted until there is update which + lead to re schedule or be added to another agent manually.""" + agent = self._get_agent(context, id) + with context.session.begin(subtransactions=True): + query = context.session.query(RouterL3AgentBinding) + query = query.filter( + RouterL3AgentBinding.router_id == router_id, + RouterL3AgentBinding.l3_agent_id == id) + try: + binding = query.one() + except exc.NoResultFound: + raise agentscheduler.RouterNotHostedByL3Agent( + router_id=router_id, agent_id=id) + context.session.delete(binding) + if self.l3_agent_notifier: + self.l3_agent_notifier.router_removed_from_agent( + context, router_id, agent.host) + + def list_routers_on_l3_agent(self, context, id): + query = context.session.query(RouterL3AgentBinding.router_id) + router_ids = query.filter( + RouterL3AgentBinding.l3_agent_id == id).all() + if router_ids: + _ids = [item[0] for item in router_ids] + return {'routers': + self.get_routers(context, filters={'id': _ids})} + else: + return {'routers': []} + + def list_active_sync_routers_on_active_l3_agent( + self, context, host, router_id): + agent = self._get_agent_by_type_and_host( + context, constants.AGENT_TYPE_L3, host) + if not agent.admin_state_up: + return [] + query = context.session.query(RouterL3AgentBinding.router_id) + query = query.filter( + RouterL3AgentBinding.l3_agent_id == agent.id) + if router_id: + query = query.filter(RouterL3AgentBinding.router_id == router_id) + router_ids = query.all() + if router_ids: + _ids = [item[0] for item in router_ids] + routers = self.get_sync_data(context, router_ids=_ids, + active=True) + return routers + return [] + + def get_l3_agents_hosting_routers(self, context, router_ids, + admin_state_up=None, + active=None): + if not router_ids: + return [] + query = context.session.query(RouterL3AgentBinding) + if len(router_ids) > 1: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id.in_(router_ids)) + else: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id == router_ids[0]) + if admin_state_up is not None: + query = (query.filter(agents_db.Agent.admin_state_up == + admin_state_up)) + l3_agents = [binding.l3_agent for binding in query.all()] + if active is not None: + l3_agents = [l3_agent for l3_agent in + l3_agents if not + agents_db.AgentDbMixin.is_agent_down( + l3_agent['heartbeat_timestamp'])] + return l3_agents + + def _get_l3_bindings_hosting_routers(self, context, router_ids): + if not router_ids: + return [] + query = context.session.query(RouterL3AgentBinding) + if len(router_ids) > 1: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id.in_(router_ids)) + else: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id == router_ids[0]) + return query.all() + + def list_l3_agents_hosting_router(self, context, router_id): + with context.session.begin(subtransactions=True): + bindings = self._get_l3_bindings_hosting_routers( + context, [router_id]) + results = [] + for binding in bindings: + l3_agent_dict = self._make_agent_dict(binding.l3_agent) + results.append(l3_agent_dict) + if results: + return {'agents': results} + else: + return {'agents': []} + + def schedule_network(self, context, request_network, created_network): + if self.network_scheduler: + result = self.network_scheduler.schedule( + self, context, request_network, created_network) + if not result: + LOG.warn(_('Fail scheduling network %s'), created_network) + + def auto_schedule_networks(self, context, host): + if self.network_scheduler: + self.network_scheduler.auto_schedule_networks(self, context, host) + + def get_l3_agents(self, context, active=None, filters=None): + query = context.session.query(agents_db.Agent) + query = query.filter( + agents_db.Agent.agent_type == constants.AGENT_TYPE_L3) + if active is not None: + query = (query.filter(agents_db.Agent.admin_state_up == active)) + if filters: + for key, value in filters.iteritems(): + column = getattr(agents_db.Agent, key, None) + if column: + query = query.filter(column.in_(value)) + l3_agents = query.all() + if active is not None: + l3_agents = [l3_agent for l3_agent in + l3_agents if not + agents_db.AgentDbMixin.is_agent_down( + l3_agent['heartbeat_timestamp'])] + return l3_agents + + def get_l3_agent_candidates(self, sync_router, l3_agents): + """Get the valid l3 agents for the router from a list of l3_agents""" + candidates = [] + for l3_agent in l3_agents: + if not l3_agent.admin_state_up: + continue + agent_conf = self.get_configuration_dict(l3_agent) + router_id = agent_conf.get('router_id', None) + use_namespaces = agent_conf.get('use_namespaces', True) + handle_internal_only_routers = agent_conf.get( + 'handle_internal_only_routers', True) + gateway_external_network_id = agent_conf.get( + 'gateway_external_network_id', None) + if not use_namespaces and router_id != sync_router['id']: + continue + ex_net_id = (sync_router['external_gateway_info'] or {}).get( + 'network_id') + if ((not ex_net_id and not handle_internal_only_routers) or + (ex_net_id and gateway_external_network_id and + ex_net_id != gateway_external_network_id)): + continue + candidates.append(l3_agent) + return candidates + + def auto_schedule_routers(self, context, host, router_id): + if self.router_scheduler: + return self.router_scheduler.auto_schedule_routers( + self, context, host, router_id) + + def schedule_router(self, context, router): + if self.router_scheduler: + return self.router_scheduler.schedule( + self, context, router) + + def schedule_routers(self, context, routers): + """Schedule the routers to l3 agents. + """ + for router in routers: + self.schedule_router(context, router) diff --git a/quantum/db/dhcp_rpc_base.py b/quantum/db/dhcp_rpc_base.py index 78a327d56c..9581472513 100644 --- a/quantum/db/dhcp_rpc_base.py +++ b/quantum/db/dhcp_rpc_base.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg from sqlalchemy.orm import exc from quantum.api.v2 import attributes +from quantum.common import constants +from quantum.common import utils from quantum import manager from quantum.openstack.common import log as logging @@ -31,14 +34,24 @@ class DhcpRpcCallbackMixin(object): host = kwargs.get('host') LOG.debug(_('Network list requested from %s'), host) plugin = manager.QuantumManager.get_plugin() - filters = dict(admin_state_up=[True]) - - return [net['id'] for net in - plugin.get_networks(context, filters=filters)] + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + if cfg.CONF.network_auto_schedule: + plugin.auto_schedule_networks(context, host) + nets = plugin.list_active_networks_on_active_dhcp_agent( + context, host) + else: + filters = dict(admin_state_up=[True]) + nets = plugin.get_networks(context, filters=filters) + return [net['id'] for net in nets] def get_network_info(self, context, **kwargs): """Retrieve and return a extended information about a network.""" network_id = kwargs.get('network_id') + host = kwargs.get('host') + LOG.debug(_('Network %(network_id)s requested from ' + '%(host)s'), {'network_id': network_id, + 'host': host}) plugin = manager.QuantumManager.get_plugin() network = plugin.get_network(context, network_id) @@ -62,7 +75,9 @@ class DhcpRpcCallbackMixin(object): # a device id that combines host and network ids LOG.debug(_('Port %(device_id)s for %(network_id)s requested from ' - '%(host)s'), locals()) + '%(host)s'), {'device_id': device_id, + 'network_id': network_id, + 'host': host}) plugin = manager.QuantumManager.get_plugin() retval = None diff --git a/quantum/db/extraroute_db.py b/quantum/db/extraroute_db.py index 77e4265609..82425278d8 100644 --- a/quantum/db/extraroute_db.py +++ b/quantum/db/extraroute_db.py @@ -154,11 +154,12 @@ class ExtraRoute_db_mixin(l3_db.L3_NAT_db_mixin): context, router['id']) return routers - def get_sync_data(self, context, router_ids=None): + def get_sync_data(self, context, router_ids=None, active=None): """Query routers and their related floating_ips, interfaces.""" with context.session.begin(subtransactions=True): routers = super(ExtraRoute_db_mixin, - self).get_sync_data(context, router_ids) + self).get_sync_data(context, router_ids, + active=active) for router in routers: router['routes'] = self._get_extra_routes_by_router_id( context, router['id']) diff --git a/quantum/db/l3_db.py b/quantum/db/l3_db.py index dd51d11040..943ed993bd 100644 --- a/quantum/db/l3_db.py +++ b/quantum/db/l3_db.py @@ -23,11 +23,11 @@ from sqlalchemy import orm from sqlalchemy.orm import exc from sqlalchemy.sql import expression as expr +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import constants as l3_constants from quantum.common import exceptions as q_exc from quantum.db import db_base_plugin_v2 -from quantum.db import l3_rpc_agent_api from quantum.db import model_base from quantum.db import models_v2 from quantum.extensions import l3 @@ -328,7 +328,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): if len(fixed_ips) != 1: msg = _('Router port must have exactly one fixed IP') raise q_exc.BadRequest(resource='router', msg=msg) - subnet = self._get_subnet(context, fixed_ips[0]['subnet_id']) + subnet_id = fixed_ips[0]['subnet_id'] + subnet = self._get_subnet(context, subnet_id) self._check_for_dup_router_subnet(context, router_id, port['network_id'], subnet['id'], @@ -360,7 +361,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'name': ''}}) routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, routers, 'add_router_interface', + {'network_id': port['network_id'], + 'subnet_id': subnet_id}) info = {'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} notifier_api.notify(context, @@ -409,6 +413,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): subnet_id = port_db['fixed_ips'][0]['subnet_id'] self._confirm_router_interface_not_in_use( context, router_id, subnet_id) + _network_id = port_db['network_id'] self.delete_port(context, port_db['id'], l3_port_check=False) elif 'subnet_id' in interface_info: subnet_id = interface_info['subnet_id'] @@ -428,6 +433,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): for p in ports: if p['fixed_ips'][0]['subnet_id'] == subnet_id: port_id = p['id'] + _network_id = p['network_id'] self.delete_port(context, p['id'], l3_port_check=False) found = True break @@ -438,7 +444,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id, subnet_id=subnet_id) routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, routers, 'remove_router_interface', + {'network_id': _network_id, + 'subnet_id': subnet_id}) notifier_api.notify(context, notifier_api.publisher_id('network'), 'router.interface.delete', @@ -649,7 +658,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router_id = floatingip_db['router_id'] if router_id: routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'create_floatingip') return self._make_floatingip_dict(floatingip_db) def update_floatingip(self, context, id, floatingip): @@ -671,7 +681,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router_ids.append(router_id) if router_ids: routers = self.get_sync_data(context.elevated(), router_ids) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'update_floatingip') return self._make_floatingip_dict(floatingip_db) def delete_floatingip(self, context, id): @@ -684,7 +695,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): l3_port_check=False) if router_id: routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'delete_floatingip') def get_floatingip(self, context, id, fields=None): floatingip = self._get_floatingip(context, id) @@ -816,7 +828,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): else: return [n for n in nets if n['id'] not in ext_nets] - def _get_sync_routers(self, context, router_ids=None): + def _get_sync_routers(self, context, router_ids=None, active=None): """Query routers and their gw ports for l3 agent. Query routers with the router_ids. The gateway ports, if any, @@ -831,7 +843,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): """ router_query = context.session.query(Router) if router_ids: - router_query = router_query.filter(Router.id.in_(router_ids)) + if 1 == len(router_ids): + router_query = router_query.filter(Router.id == router_ids[0]) + else: + router_query = router_query.filter(Router.id.in_(router_ids)) + if active is not None: + router_query = router_query.filter(Router.admin_state_up == active) routers = router_query.all() gw_port_ids = [] if not routers: @@ -842,7 +859,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): gw_port_ids.append(gw_port_id) gw_ports = [] if gw_port_ids: - gw_ports = self._get_sync_gw_ports(context, gw_port_ids) + gw_ports = self.get_sync_gw_ports(context, gw_port_ids) gw_port_id_gw_port_dict = {} for gw_port in gw_ports: gw_port_id_gw_port_dict[gw_port['id']] = gw_port @@ -862,7 +879,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): return [] return self.get_floatingips(context, {'router_id': router_ids}) - def _get_sync_gw_ports(self, context, gw_port_ids): + def get_sync_gw_ports(self, context, gw_port_ids): if not gw_port_ids: return [] filters = {'id': gw_port_ids} @@ -871,12 +888,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): self._populate_subnet_for_ports(context, gw_ports) return gw_ports - def _get_sync_interfaces(self, context, router_ids): + def get_sync_interfaces(self, context, router_ids, + device_owner=DEVICE_OWNER_ROUTER_INTF): """Query router interfaces that relate to list of router_ids.""" if not router_ids: return [] filters = {'device_id': router_ids, - 'device_owner': [DEVICE_OWNER_ROUTER_INTF]} + 'device_owner': [device_owner]} interfaces = self.get_ports(context, filters) if interfaces: self._populate_subnet_for_ports(context, interfaces) @@ -934,14 +952,15 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router[l3_constants.INTERFACE_KEY] = router_interfaces return routers_dict.values() - def get_sync_data(self, context, router_ids=None): + def get_sync_data(self, context, router_ids=None, active=None): """Query routers and their related floating_ips, interfaces.""" with context.session.begin(subtransactions=True): routers = self._get_sync_routers(context, - router_ids) + router_ids=router_ids, + active=active) router_ids = [router['id'] for router in routers] floating_ips = self._get_sync_floating_ips(context, router_ids) - interfaces = self._get_sync_interfaces(context, router_ids) + interfaces = self.get_sync_interfaces(context, router_ids) return self._process_sync_data(routers, interfaces, floating_ips) def get_external_network_id(self, context): diff --git a/quantum/db/l3_rpc_agent_api.py b/quantum/db/l3_rpc_agent_api.py deleted file mode 100644 index 718d9c48e4..0000000000 --- a/quantum/db/l3_rpc_agent_api.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright (c) 2012 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from quantum.common import topics -from quantum.openstack.common import jsonutils -from quantum.openstack.common import log as logging -from quantum.openstack.common.rpc import proxy - - -LOG = logging.getLogger(__name__) - - -class L3AgentNotifyAPI(proxy.RpcProxy): - """API for plugin to notify L3 agent.""" - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic=topics.L3_AGENT): - super(L3AgentNotifyAPI, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def router_deleted(self, context, router_id): - LOG.debug(_('Notify agent the router %s is deleted'), router_id) - self.cast(context, - self.make_msg('router_deleted', - router_id=router_id), - topic=self.topic) - - def routers_updated(self, context, routers): - if routers: - LOG.debug(_('Notify agent routers were updated:\n %s'), - jsonutils.dumps(routers, indent=5)) - self.cast(context, - self.make_msg('routers_updated', - routers=routers), - topic=self.topic) - - -L3AgentNotify = L3AgentNotifyAPI() diff --git a/quantum/db/l3_rpc_base.py b/quantum/db/l3_rpc_base.py index 2a11b701e7..4a8635679c 100644 --- a/quantum/db/l3_rpc_base.py +++ b/quantum/db/l3_rpc_base.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg + +from quantum.common import constants +from quantum.common import utils from quantum import context as quantum_context from quantum import manager from quantum.openstack.common import jsonutils @@ -34,10 +38,17 @@ class L3RpcCallbackMixin(object): with their interfaces and floating_ips """ router_id = kwargs.get('router_id') - # TODO(gongysh) we will use host in kwargs for multi host BP + host = kwargs.get('host') context = quantum_context.get_admin_context() plugin = manager.QuantumManager.get_plugin() - routers = plugin.get_sync_data(context, router_id) + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + if cfg.CONF.router_auto_schedule: + plugin.auto_schedule_routers(context, host, router_id) + routers = plugin.list_active_sync_routers_on_active_l3_agent( + context, host, router_id) + else: + routers = plugin.get_sync_data(context, router_id) LOG.debug(_("Routers returned to l3 agent:\n %s"), jsonutils.dumps(routers, indent=5)) return routers diff --git a/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py b/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py new file mode 100644 index 0000000000..2d28fff2cb --- /dev/null +++ b/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py @@ -0,0 +1,79 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""agent scheduler + +Revision ID: 4692d074d587 +Revises: 3b54bf9e29f7 +Create Date: 2013-02-21 23:01:50.370306 + +""" + +# revision identifiers, used by Alembic. +revision = '4692d074d587' +down_revision = '3b54bf9e29f7' + +# Change to ['*'] if this migration applies to all plugins + +migration_for_plugins = [ + 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2' +] + +from alembic import op +import sqlalchemy as sa + + +from quantum.db import migration + + +def upgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.create_table( + 'networkdhcpagentbindings', + sa.Column('network_id', sa.String(length=36), nullable=False), + sa.Column('dhcp_agent_id', sa.String(length=36), nullable=False), + sa.ForeignKeyConstraint(['dhcp_agent_id'], ['agents.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['network_id'], ['networks.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('network_id', 'dhcp_agent_id') + ) + op.create_table( + 'routerl3agentbindings', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('router_id', sa.String(length=36), nullable=True), + sa.Column('l3_agent_id', sa.String(length=36), nullable=True), + sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['router_id'], ['routers.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + ### end Alembic commands ### + + +def downgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('routerl3agentbindings') + op.drop_table('networkdhcpagentbindings') + ### end Alembic commands ### diff --git a/quantum/extensions/agentscheduler.py b/quantum/extensions/agentscheduler.py new file mode 100644 index 0000000000..73ae370e2f --- /dev/null +++ b/quantum/extensions/agentscheduler.py @@ -0,0 +1,252 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack, LLC. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from abc import abstractmethod + +from quantum.api import extensions +from quantum.api.v2 import base +from quantum.api.v2 import resource +from quantum.common import constants +from quantum.common import exceptions +from quantum.extensions import agent +from quantum import manager +from quantum import policy +from quantum import wsgi + +DHCP_NET = 'dhcp-network' +DHCP_NETS = DHCP_NET + 's' +DHCP_AGENT = 'dhcp-agent' +DHCP_AGENTS = DHCP_AGENT + 's' +L3_ROUTER = 'l3-router' +L3_ROUTERS = L3_ROUTER + 's' +L3_AGENT = 'l3-agent' +L3_AGENTS = L3_AGENT + 's' + + +class NetworkSchedulerController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % DHCP_NETS, + {}, + plugin=plugin) + return plugin.list_networks_on_dhcp_agent( + request.context, kwargs['agent_id']) + + def create(self, request, body, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "create_%s" % DHCP_NET, + {}, + plugin=plugin) + return plugin.add_network_to_dhcp_agent( + request.context, kwargs['agent_id'], body['network_id']) + + def delete(self, request, id, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "delete_%s" % DHCP_NET, + {}, + plugin=plugin) + return plugin.remove_network_from_dhcp_agent( + request.context, kwargs['agent_id'], id) + + +class RouterSchedulerController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % L3_ROUTERS, + {}, + plugin=plugin) + return plugin.list_routers_on_l3_agent( + request.context, kwargs['agent_id']) + + def create(self, request, body, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "create_%s" % L3_ROUTER, + {}, + plugin=plugin) + return plugin.add_router_to_l3_agent( + request.context, + kwargs['agent_id'], + body['router_id']) + + def delete(self, request, id, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "delete_%s" % L3_ROUTER, + {}, + plugin=plugin) + return plugin.remove_router_from_l3_agent( + request.context, kwargs['agent_id'], id) + + +class DhcpAgentsHostingNetworkController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % DHCP_AGENTS, + {}, + plugin=plugin) + return plugin.list_dhcp_agents_hosting_network( + request.context, kwargs['network_id']) + + +class L3AgentsHostingRouterController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % L3_AGENTS, + {}, + plugin=plugin) + return plugin.list_l3_agents_hosting_router( + request.context, kwargs['router_id']) + + +class Agentscheduler(extensions.ExtensionDescriptor): + """Extension class supporting agent scheduler. + """ + + @classmethod + def get_name(cls): + return "Agent Schedulers" + + @classmethod + def get_alias(cls): + return constants.AGENT_SCHEDULER_EXT_ALIAS + + @classmethod + def get_description(cls): + return "Schedule resources among agents" + + @classmethod + def get_namespace(cls): + return "http://docs.openstack.org/ext/agent_scheduler/api/v1.0" + + @classmethod + def get_updated(cls): + return "2013-02-03T10:00:00-00:00" + + @classmethod + def get_resources(cls): + """Returns Ext Resources """ + exts = [] + parent = dict(member_name="agent", + collection_name="agents") + controller = resource.Resource(NetworkSchedulerController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + DHCP_NETS, controller, parent)) + + controller = resource.Resource(RouterSchedulerController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + L3_ROUTERS, controller, parent)) + + parent = dict(member_name="network", + collection_name="networks") + + controller = resource.Resource(DhcpAgentsHostingNetworkController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + DHCP_AGENTS, controller, parent)) + + parent = dict(member_name="router", + collection_name="routers") + + controller = resource.Resource(L3AgentsHostingRouterController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + L3_AGENTS, controller, parent)) + return exts + + def get_extended_resources(self, version): + return {} + + +class InvalidDHCPAgent(agent.AgentNotFound): + message = _("Agent %(id)s is not a valid DHCP Agent or has been disabled") + + +class NetworkHostedByDHCPAgent(exceptions.Conflict): + message = _("The network %(network_id)s has been already hosted" + " by the DHCP Agent %(agent_id)s.") + + +class NetworkNotHostedByDhcpAgent(exceptions.Conflict): + message = _("The network %(network_id)s is not hosted" + " by the DHCP agent %(agent_id)s.") + + +class InvalidL3Agent(agent.AgentNotFound): + message = _("Agent %(id)s is not a L3 Agent or has been disabled") + + +class RouterHostedByL3Agent(exceptions.Conflict): + message = _("The router %(router_id)s has been already hosted" + " by the L3 Agent %(agent_id)s.") + + +class RouterSchedulingFailed(exceptions.Conflict): + message = _("Failed scheduling router %(router_id)s to" + " the L3 Agent %(agent_id)s.") + + +class RouterNotHostedByL3Agent(exceptions.Conflict): + message = _("The router %(router_id)s is not hosted" + " by L3 agent %(agent_id)s.") + + +class AgentSchedulerPluginBase(object): + """ REST API to operate the agent scheduler. + + All of method must be in an admin context. + """ + + @abstractmethod + def add_network_to_dhcp_agent(self, context, id, network_id): + pass + + @abstractmethod + def remove_network_from_dhcp_agent(self, context, id, network_id): + pass + + @abstractmethod + def list_networks_on_dhcp_agent(self, context, id): + pass + + @abstractmethod + def list_dhcp_agents_hosting_network(self, context, network_id): + pass + + @abstractmethod + def add_router_to_l3_agent(self, context, id, router_id): + pass + + @abstractmethod + def remove_router_from_l3_agent(self, context, id, router_id): + pass + + @abstractmethod + def list_routers_on_l3_agent(self, context, id): + pass + + @abstractmethod + def list_l3_agents_hosting_router(self, context, router_id): + pass diff --git a/quantum/plugins/openvswitch/common/config.py b/quantum/plugins/openvswitch/common/config.py index 6f16e3cdbb..4886974ddf 100644 --- a/quantum/plugins/openvswitch/common/config.py +++ b/quantum/plugins/openvswitch/common/config.py @@ -17,6 +17,7 @@ from oslo.config import cfg from quantum.agent.common import config +from quantum import scheduler DEFAULT_BRIDGE_MAPPINGS = [] @@ -64,3 +65,4 @@ cfg.CONF.register_opts(ovs_opts, "OVS") cfg.CONF.register_opts(agent_opts, "AGENT") config.register_agent_state_opts_helper(cfg.CONF) config.register_root_helper(cfg.CONF) +cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS) diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index ce22d9edb5..86354fbebd 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -25,12 +25,15 @@ import sys from oslo.config import cfg from quantum.agent import securitygroups_rpc as sg_rpc +from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import constants as q_const from quantum.common import exceptions as q_exc from quantum.common import rpc as q_rpc from quantum.common import topics from quantum.db import agents_db +from quantum.db import agentschedulers_db from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base from quantum.db import extraroute_db @@ -41,6 +44,7 @@ from quantum.db import securitygroups_rpc_base as sg_db_rpc from quantum.extensions import portbindings from quantum.extensions import providernet as provider from quantum.extensions import securitygroup as ext_sg +from quantum.openstack.common import importutils from quantum.openstack.common import log as logging from quantum.openstack.common import rpc from quantum.openstack.common.rpc import proxy @@ -211,7 +215,9 @@ class AgentNotifierApi(proxy.RpcProxy, class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, extraroute_db.ExtraRoute_db_mixin, sg_db_rpc.SecurityGroupServerRpcMixin, - agents_db.AgentDbMixin): + agents_db.AgentDbMixin, + agentschedulers_db.AgentSchedulerDbMixin): + """Implement the Quantum abstractions using Open vSwitch. Depending on whether tunneling is enabled, either a GRE tunnel or @@ -238,8 +244,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, supported_extension_aliases = ["provider", "router", "binding", "quotas", "security-group", - "agent", - "extraroute"] + "agent", "extraroute", "agent_scheduler"] network_view = "extension:provider_network:view" network_set = "extension:provider_network:set" @@ -269,12 +274,18 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, "Agent terminated!")) sys.exit(1) self.setup_rpc() + self.network_scheduler = importutils.import_object( + cfg.CONF.network_scheduler_driver) + self.router_scheduler = importutils.import_object( + cfg.CONF.router_scheduler_driver) def setup_rpc(self): # RPC support self.topic = topics.PLUGIN self.conn = rpc.create_connection(new=True) self.notifier = AgentNotifierApi(topics.AGENT) + self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify self.callbacks = OVSRpcCallbacks(self.notifier) self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, @@ -486,6 +497,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self._extend_network_dict_l3(context, net) # note - exception will rollback entire transaction LOG.debug(_("Created network: %s"), net['id']) + self.schedule_network(context, network['network'], net) return net def update_network(self, context, id, network): @@ -569,6 +581,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, else: self.notifier.security_groups_member_updated( context, port.get(ext_sg.SECURITYGROUPS)) + net = self.get_network(context, port['network_id']) + self.schedule_network(context, None, net) return self._extend_port_dict_binding(context, port) def get_port(self, context, id, fields=None): @@ -636,3 +650,20 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self.notifier.security_groups_member_updated( context, port.get(ext_sg.SECURITYGROUPS)) + + def update_agent(self, context, id, agent): + original_agent = self.get_agent(context, id) + result = super(OVSQuantumPluginV2, self).update_agent( + context, id, agent) + agent_data = agent['agent'] + if ('admin_state_up' in agent_data and + original_agent['admin_state_up'] != agent_data['admin_state_up']): + if original_agent['agent_type'] == q_const.AGENT_TYPE_DHCP: + self.dhcp_agent_notifier.agent_updated( + context, agent_data['admin_state_up'], + original_agent['host']) + elif original_agent['agent_type'] == q_const.AGENT_TYPE_L3: + self.l3_agent_notifier.agent_updated( + context, agent_data['admin_state_up'], + original_agent['host']) + return result diff --git a/quantum/policy.py b/quantum/policy.py index f9dc76df20..d8e31456e0 100644 --- a/quantum/policy.py +++ b/quantum/policy.py @@ -51,6 +51,7 @@ def init(): raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file) # pass _set_brain to read_cached_file so that the policy brain # is reset only if the file has changed + LOG.debug(_("loading policy file at %s"), _POLICY_PATH) utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE, reload_func=_set_rules) diff --git a/quantum/scheduler/__init__.py b/quantum/scheduler/__init__.py new file mode 100644 index 0000000000..082601856d --- /dev/null +++ b/quantum/scheduler/__init__.py @@ -0,0 +1,34 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + + +AGENTS_SCHEDULER_OPTS = [ + cfg.StrOpt('network_scheduler_driver', + default='quantum.scheduler.' + 'dhcp_agent_scheduler.ChanceScheduler', + help=_('Driver to use for scheduling network to DHCP agent')), + cfg.StrOpt('router_scheduler_driver', + default='quantum.scheduler.l3_agent_scheduler.ChanceScheduler', + help=_('Driver to use for scheduling ' + 'router to a default L3 agent')), + cfg.BoolOpt('network_auto_schedule', default=True, + help=_('Allow auto scheduling networks to DHCP agent.')), + cfg.BoolOpt('router_auto_schedule', default=True, + help=_('Allow auto scheduling routers to L3 agent.')), +] diff --git a/quantum/scheduler/dhcp_agent_scheduler.py b/quantum/scheduler/dhcp_agent_scheduler.py new file mode 100644 index 0000000000..62929889cb --- /dev/null +++ b/quantum/scheduler/dhcp_agent_scheduler.py @@ -0,0 +1,108 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from sqlalchemy.orm import exc +from sqlalchemy.sql import exists + +from quantum.common import constants +from quantum.db import models_v2 +from quantum.db import agents_db +from quantum.db import agentschedulers_db +from quantum.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class ChanceScheduler(object): + """Allocate a DHCP agent for a network in a random way. + More sophisticated scheduler (similar to filter scheduler in nova?) + can be introduced later.""" + + def schedule(self, plugin, context, request_network, network): + """Schedule the network to an active DHCP agent if there + is no active DHCP agent hosting it. + """ + #TODO(gongysh) don't schedule the networks with only + # subnets whose enable_dhcp is false + with context.session.begin(subtransactions=True): + dhcp_agents = plugin.get_dhcp_agents_hosting_networks( + context, [network['id']], active=True) + if dhcp_agents: + LOG.debug(_('Network %s is hosted already'), + network['id']) + return False + enabled_dhcp_agents = plugin.get_agents_db( + context, filters={ + 'agent_type': [constants.AGENT_TYPE_DHCP], + 'admin_state_up': [True]}) + if not enabled_dhcp_agents: + LOG.warn(_('No enabled DHCP agents')) + return False + active_dhcp_agents = [enabled_dhcp_agent for enabled_dhcp_agent in + enabled_dhcp_agents if not + agents_db.AgentDbMixin.is_agent_down( + enabled_dhcp_agent['heartbeat_timestamp'])] + if not active_dhcp_agents: + LOG.warn(_('No active DHCP agents')) + return False + chosen_agent = random.choice(active_dhcp_agents) + binding = agentschedulers_db.NetworkDhcpAgentBinding() + binding.dhcp_agent = chosen_agent + binding.network_id = network['id'] + context.session.add(binding) + LOG.debug(_('Network %(network_id)s is scheduled to be hosted by ' + 'DHCP agent %(agent_id)s'), + {'network_id': network['id'], + 'agent_id': chosen_agent['id']}) + return True + + def auto_schedule_networks(self, plugin, context, host): + """Schedule non-hosted networks to the DHCP agent on + the specified host.""" + with context.session.begin(subtransactions=True): + query = context.session.query(agents_db.Agent) + query = query.filter(agents_db.Agent.agent_type == + constants.AGENT_TYPE_DHCP, + agents_db.Agent.host == host, + agents_db.Agent.admin_state_up == True) + try: + dhcp_agent = query.one() + except (exc.MultipleResultsFound, exc.NoResultFound): + LOG.warn(_('No enabled DHCP agent on host %s'), + host) + return False + if agents_db.AgentDbMixin.is_agent_down( + dhcp_agent.heartbeat_timestamp): + LOG.warn(_('DHCP agent %s is not active'), dhcp_agent.id) + #TODO(gongysh) consider the disabled agent's network + net_stmt = ~exists().where( + models_v2.Network.id == + agentschedulers_db.NetworkDhcpAgentBinding.network_id) + net_ids = context.session.query( + models_v2.Network.id).filter(net_stmt).all() + if not net_ids: + LOG.debug(_('No non-hosted networks')) + return False + for net_id in net_ids: + binding = agentschedulers_db.NetworkDhcpAgentBinding() + binding.dhcp_agent = dhcp_agent + binding.network_id = net_id[0] + context.session.add(binding) + return True diff --git a/quantum/scheduler/l3_agent_scheduler.py b/quantum/scheduler/l3_agent_scheduler.py new file mode 100644 index 0000000000..0d3b1efbb5 --- /dev/null +++ b/quantum/scheduler/l3_agent_scheduler.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from sqlalchemy.orm import exc +from sqlalchemy.sql import exists + +from quantum.common import constants +from quantum.db import l3_db +from quantum.db import agents_db +from quantum.db import agentschedulers_db +from quantum.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class ChanceScheduler(object): + """Allocate a L3 agent for a router in a random way. + More sophisticated scheduler (similar to filter scheduler in nova?) + can be introduced later.""" + + def auto_schedule_routers(self, plugin, context, host, router_id): + """Schedule non-hosted routers to L3 Agent running on host. + If router_id is given, only this router is scheduled + if it is not hosted yet. + Don't schedule the routers which are hosted already + by active l3 agents. + """ + with context.session.begin(subtransactions=True): + # query if we have valid l3 agent on the host + query = context.session.query(agents_db.Agent) + query = query.filter(agents_db.Agent.agent_type == + constants.AGENT_TYPE_L3, + agents_db.Agent.host == host, + agents_db.Agent.admin_state_up == True) + try: + l3_agent = query.one() + except (exc.MultipleResultsFound, exc.NoResultFound): + LOG.debug(_('No enabled L3 agent on host %s'), + host) + return False + if agents_db.AgentDbMixin.is_agent_down( + l3_agent.heartbeat_timestamp): + LOG.warn(_('L3 agent %s is not active'), l3_agent.id) + # check if the specified router is hosted + if router_id: + l3_agents = plugin.get_l3_agents_hosting_routers( + context, [router_id], admin_state_up=True) + if l3_agents: + LOG.debug(_('Router %(router_id)s has already been hosted' + ' by L3 agent %(agent_id)s'), + {'router_id': router_id, + 'agent_id': l3_agents[0]['id']}) + return False + + # get the router ids + if router_id: + router_ids = [(router_id,)] + else: + # get all routers that are not hosted + #TODO(gongysh) consider the disabled agent's router + stmt = ~exists().where( + l3_db.Router.id == + agentschedulers_db.RouterL3AgentBinding.router_id) + router_ids = context.session.query( + l3_db.Router.id).filter(stmt).all() + if not router_ids: + LOG.debug(_('No non-hosted routers')) + return False + + # check if the configuration of l3 agent is compatible + # with the router + router_ids = [router_id[0] for router_id in router_ids] + routers = plugin.get_routers(context, filters={'id': router_ids}) + to_removed_ids = [] + for router in routers: + candidates = plugin.get_l3_agent_candidates(router, [l3_agent]) + if not candidates: + to_removed_ids.append(router['id']) + router_ids = list(set(router_ids) - set(to_removed_ids)) + if not router_ids: + LOG.warn(_('No routers compatible with L3 agent configuration' + ' on host %s', host)) + return False + + # binding + for router_id in router_ids: + binding = agentschedulers_db.RouterL3AgentBinding() + binding.l3_agent = l3_agent + binding.router_id = router_id + binding.default = True + context.session.add(binding) + return True + + def schedule(self, plugin, context, sync_router): + """Schedule the router to an active L3 agent if there + is no enable L3 agent hosting it. + """ + with context.session.begin(subtransactions=True): + # allow one router is hosted by just + # one enabled l3 agent hosting since active is just a + # timing problem. Non-active l3 agent can return to + # active any time + l3_agents = plugin.get_l3_agents_hosting_routers( + context, [sync_router['id']], admin_state_up=True) + if l3_agents: + LOG.debug(_('Router %(router_id)s has already been hosted' + ' by L3 agent %(agent_id)s'), + {'router_id': sync_router['id'], + 'agent_id': l3_agents[0]['id']}) + return False + + active_l3_agents = plugin.get_l3_agents(context, active=True) + if not active_l3_agents: + LOG.warn(_('No active L3 agents')) + return False + candidates = plugin.get_l3_agent_candidates(sync_router, + active_l3_agents) + if not candidates: + LOG.warn(_('No L3 agents can host the router %s'), + sync_router['id']) + return False + + chosen_agent = random.choice(candidates) + binding = agentschedulers_db.RouterL3AgentBinding() + binding.l3_agent = chosen_agent + binding.router_id = sync_router['id'] + context.session.add(binding) + LOG.debug(_('Router %(router_id)s is scheduled to ' + 'L3 agent %(agent_id)s'), + {'router_id': sync_router['id'], + 'agent_id': chosen_agent['id']}) + return True diff --git a/quantum/tests/unit/openvswitch/test_agent_scheduler.py b/quantum/tests/unit/openvswitch/test_agent_scheduler.py new file mode 100644 index 0000000000..38c74ca9bd --- /dev/null +++ b/quantum/tests/unit/openvswitch/test_agent_scheduler.py @@ -0,0 +1,803 @@ +# Copyright (c) 2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import copy + +import mock +from webob import exc + +from quantum.api import extensions +from quantum.common import constants +from quantum import context +from quantum.db import agents_db +from quantum.db import dhcp_rpc_base +from quantum.db import l3_rpc_base +from quantum.extensions import agentscheduler +from quantum import manager +from quantum.openstack.common import uuidutils +from quantum.plugins.openvswitch.ovs_quantum_plugin import OVSQuantumPluginV2 +from quantum.tests.unit import test_agent_ext_plugin +from quantum.tests.unit.testlib_api import create_request +from quantum.tests.unit import test_db_plugin as test_plugin +from quantum.tests.unit import test_extensions +from quantum.tests.unit import test_l3_plugin +from quantum.wsgi import Serializer + +L3_HOSTA = 'hosta' +DHCP_HOSTA = 'hosta' +L3_HOSTB = 'hostb' +DHCP_HOSTC = 'hostc' + + +class AgentSchedulerTestMixIn(object): + + def _request_list(self, path, admin_context=True, + expected_code=exc.HTTPOk.code): + req = self._path_req(path, admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + return self.deserialize(self.fmt, res) + + def _path_req(self, path, method='GET', data=None, + query_string=None, + admin_context=True): + content_type = 'application/%s' % self.fmt + body = None + if data is not None: # empty dict is valid + body = Serializer().serialize(data, content_type) + if admin_context: + return create_request( + path, body, content_type, method, query_string=query_string) + else: + return create_request( + path, body, content_type, method, query_string=query_string, + context=context.Context('', 'tenant_id')) + + def _path_create_request(self, path, data, admin_context=True): + return self._path_req(path, method='POST', data=data, + admin_context=admin_context) + + def _path_show_request(self, path, admin_context=True): + return self._path_req(path, admin_context=admin_context) + + def _path_delete_request(self, path, admin_context=True): + return self._path_req(path, method='DELETE', + admin_context=admin_context) + + def _path_update_request(self, path, data, admin_context=True): + return self._path_req(path, method='PUT', data=data, + admin_context=admin_context) + + def _list_routers_hosted_by_l3_agent(self, agent_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (agent_id, + agentscheduler.L3_ROUTERS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_networks_hosted_by_dhcp_agent(self, agent_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (agent_id, + agentscheduler.DHCP_NETS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_l3_agents_hosting_router(self, router_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/routers/%s/%s.%s" % (router_id, + agentscheduler.L3_AGENTS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_dhcp_agents_hosting_network(self, network_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/networks/%s/%s.%s" % (network_id, + agentscheduler.DHCP_AGENTS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _add_router_to_l3_agent(self, id, router_id, + expected_code=exc.HTTPCreated.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (id, + agentscheduler.L3_ROUTERS, + self.fmt) + req = self._path_create_request(path, + {'router_id': router_id}, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _add_network_to_dhcp_agent(self, id, network_id, + expected_code=exc.HTTPCreated.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (id, + agentscheduler.DHCP_NETS, + self.fmt) + req = self._path_create_request(path, + {'network_id': network_id}, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _remove_network_from_dhcp_agent(self, id, network_id, + expected_code=exc.HTTPNoContent.code, + admin_context=True): + path = "/agents/%s/%s/%s.%s" % (id, + agentscheduler.DHCP_NETS, + network_id, + self.fmt) + req = self._path_delete_request(path, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _remove_router_from_l3_agent(self, id, router_id, + expected_code=exc.HTTPNoContent.code, + admin_context=True): + path = "/agents/%s/%s/%s.%s" % (id, + agentscheduler.L3_ROUTERS, + router_id, + self.fmt) + req = self._path_delete_request(path, admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _register_one_agent_state(self, agent_state): + callback = agents_db.AgentExtRpcCallback() + callback.report_state(self.adminContext, + agent_state={'agent_state': agent_state}) + + def _disable_agent(self, agent_id, admin_state_up=False): + new_agent = {} + new_agent['agent'] = {} + new_agent['agent']['admin_state_up'] = admin_state_up + self._update('agents', agent_id, new_agent) + + def _get_agent_id(self, agent_type, host): + agents = self._list_agents() + for agent in agents['agents']: + if (agent['agent_type'] == agent_type and + agent['host'] == host): + return agent['id'] + + +class AgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin, + test_agent_ext_plugin.AgentDBTestMixIn, + AgentSchedulerTestMixIn, + test_plugin.QuantumDbPluginV2TestCase): + fmt = 'json' + + def setUp(self): + plugin = ('quantum.plugins.openvswitch.' + 'ovs_quantum_plugin.OVSQuantumPluginV2') + self.dhcp_notifier_cls_p = mock.patch( + 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.' + 'DhcpAgentNotifyAPI') + self.dhcp_notifier = mock.Mock(name='dhcp_notifier') + self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start() + self.dhcp_notifier_cls.return_value = self.dhcp_notifier + super(AgentSchedulerTestCase, self).setUp(plugin) + ext_mgr = extensions.PluginAwareExtensionManager.get_instance() + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + self.adminContext = context.get_admin_context() + self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin() + self.addCleanup(self.dhcp_notifier_cls_p.stop) + + def test_report_states(self): + self._register_agent_states() + agents = self._list_agents() + self.assertEqual(4, len(agents['agents'])) + + def test_network_scheduling_on_network_creation(self): + self._register_agent_states() + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + + def test_network_auto_schedule_with_disabled(self): + with contextlib.nested(self.network(), + self.network()): + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + self._disable_agent(hosta_id) + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + # second agent will host all the networks since first is disabled. + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC) + networks = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(networks['networks']) + networks = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(networks['networks']) + self.assertEqual(0, num_hosta_nets) + self.assertEqual(2, num_hostc_nets) + + def test_network_auto_schedule_with_hosted(self): + # one agent hosts all the networks, other hosts none + with contextlib.nested(self.network(), + self.network()) as (net1, net2): + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + self._register_agent_states() + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + # second agent will not host the network since first has got it. + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC) + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(hosta_nets['networks']) + hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(hostc_nets['networks']) + + self.assertEqual(2, num_hosta_nets) + self.assertEqual(0, num_hostc_nets) + self.assertEqual(1, len(dhcp_agents['agents'])) + self.assertEqual(DHCP_HOSTA, dhcp_agents['agents'][0]['host']) + + def test_network_auto_schedule_with_hosted_2(self): + # one agent hosts one network + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + dhcp_hostc = copy.deepcopy(dhcp_hosta) + dhcp_hostc['host'] = DHCP_HOSTC + with self.network() as net1: + self._register_one_agent_state(dhcp_hosta) + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + with self.network() as net2: + self._register_one_agent_state(dhcp_hostc) + dhcp_rpc.get_active_networks(self.adminContext, + host=DHCP_HOSTC) + dhcp_agents_1 = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + dhcp_agents_2 = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(hosta_nets['networks']) + hostc_id = self._get_agent_id( + constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(hostc_nets['networks']) + + self.assertEqual(1, num_hosta_nets) + self.assertEqual(1, num_hostc_nets) + self.assertEqual(1, len(dhcp_agents_1['agents'])) + self.assertEqual(1, len(dhcp_agents_2['agents'])) + self.assertEqual(DHCP_HOSTA, dhcp_agents_1['agents'][0]['host']) + self.assertEqual(DHCP_HOSTC, dhcp_agents_2['agents'][0]['host']) + + def test_network_scheduling_on_port_creation(self): + with self.subnet() as subnet: + dhcp_agents = self._list_dhcp_agents_hosting_network( + subnet['subnet']['network_id']) + result0 = len(dhcp_agents['agents']) + self._register_agent_states() + with self.port(subnet=subnet, + device_owner="compute:test:" + DHCP_HOSTA) as port: + dhcp_agents = self._list_dhcp_agents_hosting_network( + port['port']['network_id']) + result1 = len(dhcp_agents['agents']) + self.assertEqual(0, result0) + self.assertEqual(1, result1) + + def test_network_scheduler_with_disabled_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + with self.network() as net1: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + agents = self._list_agents() + self._disable_agent(agents['agents'][0]['id']) + with self.network() as net2: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_scheduler_with_down_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + is_agent_down_str = 'quantum.db.agents_db.AgentDbMixin.is_agent_down' + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = False + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = True + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_scheduler_with_hosted_network(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + agents = self._list_agents() + with self.network() as net1: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + with mock.patch.object(OVSQuantumPluginV2, + 'get_dhcp_agents_hosting_networks', + autospec=True) as mock_hosting_agents: + + mock_hosting_agents.return_value = agents['agents'] + with self.network(do_delete=False) as net2: + pass + dhcp_agents = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_policy(self): + with self.network() as net1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + self._list_networks_hosted_by_dhcp_agent( + hosta_id, expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_network_to_dhcp_agent( + hosta_id, net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_network_to_dhcp_agent(hosta_id, + net1['network']['id']) + self._remove_network_from_dhcp_agent( + hosta_id, net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._list_dhcp_agents_hosting_network( + net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + + def test_network_add_to_dhcp_agent(self): + with self.network() as net1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + num_before_add = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self._add_network_to_dhcp_agent(hosta_id, + net1['network']['id']) + num_after_add = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self.assertEqual(0, num_before_add) + self.assertEqual(1, num_after_add) + + def test_network_remove_from_dhcp_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + with self.network() as net1: + num_before_remove = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self._remove_network_from_dhcp_agent(hosta_id, + net1['network']['id']) + num_after_remove = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self.assertEqual(1, num_before_remove) + self.assertEqual(0, num_after_remove) + + def test_router_auto_schedule_with_hosted(self): + with self.router() as router: + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + self._register_agent_states() + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, len(l3_agents['agents'])) + self.assertEqual(L3_HOSTA, l3_agents['agents'][0]['host']) + + def test_router_auto_schedule_with_hosted_2(self): + # one agent hosts one router + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': True, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + l3_hostb = copy.deepcopy(l3_hosta) + l3_hostb['host'] = L3_HOSTB + with self.router() as router1: + self._register_one_agent_state(l3_hosta) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + with self.router() as router2: + self._register_one_agent_state(l3_hostb) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + hostb_id = self._get_agent_id( + constants.AGENT_TYPE_L3, + L3_HOSTB) + hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id) + num_hostc_routers = len(hostb_routers['routers']) + + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, num_hostc_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(1, len(l3_agents_2['agents'])) + self.assertEqual(L3_HOSTA, l3_agents_1['agents'][0]['host']) + self.assertEqual(L3_HOSTB, l3_agents_2['agents'][0]['host']) + + def test_router_auto_schedule_with_disabled(self): + with contextlib.nested(self.router(), + self.router()): + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTB) + self._disable_agent(hosta_id) + # first agent will not host router since it is disabled + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + # second agent will host all the routers since first is disabled. + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id) + num_hostb_routers = len(hostb_routers['routers']) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + self.assertEqual(2, num_hostb_routers) + self.assertEqual(0, num_hosta_routers) + + def test_router_auto_schedule_with_candidates(self): + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': False, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + with contextlib.nested(self.router(), + self.router()) as (router1, router2): + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + l3_hosta['configurations']['router_id'] = router1['router']['id'] + self._register_one_agent_state(l3_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + # L3 agent will host only the compatible router. + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(0, len(l3_agents_2['agents'])) + + def test_router_schedule_with_candidates(self): + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': False, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + with contextlib.nested(self.router(), + self.router(), + self.subnet(), + self.subnet(cidr='10.0.3.0/24')) as (router1, + router2, + subnet1, + subnet2): + l3_hosta['configurations']['router_id'] = router1['router']['id'] + self._register_one_agent_state(l3_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._router_interface_action('add', + router1['router']['id'], + subnet1['subnet']['id'], + None) + self._router_interface_action('add', + router2['router']['id'], + subnet2['subnet']['id'], + None) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + # L3 agent will host only the compatible router. + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(0, len(l3_agents_2['agents'])) + + def test_router_without_l3_agents(self): + with self.subnet() as s: + self._set_net_external(s['subnet']['network_id']) + data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data['router']['name'] = 'router1' + data['router']['external_gateway_info'] = { + 'network_id': s['subnet']['network_id']} + router_req = self.new_create_request('routers', data, self.fmt) + res = router_req.get_response(self.ext_api) + router = self.deserialize(self.fmt, res) + l3agents = ( + self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers( + self.adminContext, [router['router']['id']])) + self._delete('routers', router['router']['id']) + self.assertEqual(0, len(l3agents)) + + def test_router_sync_data(self): + with contextlib.nested(self.subnet(), + self.subnet(cidr='10.0.2.0/24'), + self.subnet(cidr='10.0.3.0/24')) as ( + s1, s2, s3): + self._register_agent_states() + self._set_net_external(s1['subnet']['network_id']) + data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data['router']['name'] = 'router1' + data['router']['external_gateway_info'] = { + 'network_id': s1['subnet']['network_id']} + router_req = self.new_create_request('routers', data, self.fmt) + res = router_req.get_response(self.ext_api) + router = self.deserialize(self.fmt, res) + self._router_interface_action('add', + router['router']['id'], + s2['subnet']['id'], + None) + self._router_interface_action('add', + router['router']['id'], + s3['subnet']['id'], + None) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, len(l3agents['agents'])) + agents = self._list_agents() + another_l3_agent_id = None + another_l3_agent_host = None + default = l3agents['agents'][0]['id'] + for com in agents['agents']: + if (com['id'] != default and + com['agent_type'] == constants.AGENT_TYPE_L3): + another_l3_agent_id = com['id'] + another_l3_agent_host = com['host'] + break + self.assertTrue(another_l3_agent_id is not None) + self._add_router_to_l3_agent(another_l3_agent_id, + router['router']['id'], + expected_code=exc.HTTPConflict.code) + self._remove_router_from_l3_agent(default, + router['router']['id']) + self._add_router_to_l3_agent(another_l3_agent_id, + router['router']['id']) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(another_l3_agent_host, + l3agents['agents'][0]['host']) + self._remove_router_from_l3_agent(another_l3_agent_id, + router['router']['id']) + self._router_interface_action('remove', + router['router']['id'], + s2['subnet']['id'], + None) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, + len(l3agents['agents'])) + self._router_interface_action('remove', + router['router']['id'], + s3['subnet']['id'], + None) + self._delete('routers', router['router']['id']) + + def test_router_add_to_l3_agent(self): + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + num_before_add = len( + self._list_routers_hosted_by_l3_agent( + hosta_id)['routers']) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTB) + self._add_router_to_l3_agent(hostb_id, + router1['router']['id'], + expected_code=exc.HTTPConflict.code) + num_after_add = len( + self._list_routers_hosted_by_l3_agent( + hosta_id)['routers']) + self.assertEqual(0, num_before_add) + self.assertEqual(1, num_after_add) + + def test_router_policy(self): + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._list_routers_hosted_by_l3_agent( + hosta_id, expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_router_to_l3_agent( + hosta_id, router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_router_to_l3_agent( + hosta_id, router1['router']['id']) + self._remove_router_from_l3_agent( + hosta_id, router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._list_l3_agents_hosting_router( + router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + + +class L3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, + test_agent_ext_plugin.AgentDBTestMixIn, + AgentSchedulerTestMixIn, + test_plugin.QuantumDbPluginV2TestCase): + def setUp(self): + plugin = ('quantum.plugins.openvswitch.' + 'ovs_quantum_plugin.OVSQuantumPluginV2') + self.dhcp_notifier_cls_p = mock.patch( + 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.' + 'DhcpAgentNotifyAPI') + self.dhcp_notifier = mock.Mock(name='dhcp_notifier') + self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start() + self.dhcp_notifier_cls.return_value = self.dhcp_notifier + super(L3AgentNotifierTestCase, self).setUp(plugin) + ext_mgr = extensions.PluginAwareExtensionManager.get_instance() + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + self.adminContext = context.get_admin_context() + self.addCleanup(self.dhcp_notifier_cls_p.stop) + + def test_router_add_to_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + routers = plugin.get_sync_data(self.adminContext, + [router1['router']['id']]) + mock_l3.assert_called_with( + mock.ANY, + plugin.l3_agent_notifier.make_msg( + 'router_added_to_agent', + payload=routers), + topic='l3_agent.hosta') + + def test_router_remove_from_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + self._remove_router_from_l3_agent(hosta_id, + router1['router']['id']) + mock_l3.assert_called_with( + mock.ANY, plugin.l3_agent_notifier.make_msg( + 'router_removed_from_agent', + payload={'router_id': router1['router']['id']}), + topic='l3_agent.hosta') + + def test_agent_updated_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + mock_l3.assert_called_with( + mock.ANY, plugin.l3_agent_notifier.make_msg( + 'agent_updated', + payload={'admin_state_up': False}), + topic='l3_agent.hosta') + + +class AgentSchedulerTestCaseXML(AgentSchedulerTestCase): + fmt = 'xml' diff --git a/quantum/tests/unit/test_agent_ext_plugin.py b/quantum/tests/unit/test_agent_ext_plugin.py index f38b72c489..24acc4ea19 100644 --- a/quantum/tests/unit/test_agent_ext_plugin.py +++ b/quantum/tests/unit/test_agent_ext_plugin.py @@ -62,28 +62,17 @@ class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2, supported_extension_aliases = ["agent"] -class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): - fmt = 'json' - - def setUp(self): - self.adminContext = context.get_admin_context() - test_config['plugin_name_v2'] = ( - 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin') - # for these tests we need to enable overlapping ips - cfg.CONF.set_default('allow_overlapping_ips', True) - ext_mgr = AgentTestExtensionManager() - test_config['extension_manager'] = ext_mgr - super(AgentDBTestCase, self).setUp() +class AgentDBTestMixIn(object): def _list_agents(self, expected_res_status=None, quantum_context=None, query_string=None): - comp_res = self._list('agents', - quantum_context=quantum_context, - query_params=query_string) + agent_res = self._list('agents', + quantum_context=quantum_context, + query_params=query_string) if expected_res_status: - self.assertEqual(comp_res.status_int, expected_res_status) - return comp_res + self.assertEqual(agent_res.status_int, expected_res_status) + return agent_res def _register_agent_states(self): """Register two L3 agents and two DHCP agents.""" @@ -123,6 +112,21 @@ class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): agent_state={'agent_state': dhcp_hostc}) return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] + +class AgentDBTestCase(AgentDBTestMixIn, + test_db_plugin.QuantumDbPluginV2TestCase): + fmt = 'json' + + def setUp(self): + self.adminContext = context.get_admin_context() + test_config['plugin_name_v2'] = ( + 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin') + # for these tests we need to enable overlapping ips + cfg.CONF.set_default('allow_overlapping_ips', True) + ext_mgr = AgentTestExtensionManager() + test_config['extension_manager'] = ext_mgr + super(AgentDBTestCase, self).setUp() + def test_create_agent(self): data = {'agent': {}} _req = self.new_create_request('agents', data, self.fmt) diff --git a/quantum/tests/unit/test_db_rpc_base.py b/quantum/tests/unit/test_db_rpc_base.py index 994e3c82b3..136973e37c 100644 --- a/quantum/tests/unit/test_db_rpc_base.py +++ b/quantum/tests/unit/test_db_rpc_base.py @@ -25,7 +25,7 @@ class TestDhcpRpcCallackMixin(testtools.TestCase): super(TestDhcpRpcCallackMixin, self).setUp() self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin') get_plugin = self.plugin_p.start() - self.plugin = mock.Mock() + self.plugin = mock.MagicMock() get_plugin.return_value = self.plugin self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin() self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG') diff --git a/quantum/tests/unit/test_l3_agent.py b/quantum/tests/unit/test_l3_agent.py index 38bd901a0a..576b10f61d 100644 --- a/quantum/tests/unit/test_l3_agent.py +++ b/quantum/tests/unit/test_l3_agent.py @@ -88,7 +88,7 @@ class TestBasicRouterOperations(testtools.TestCase): def testRouterInfoCreate(self): id = _uuid() ri = l3_agent.RouterInfo(id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) self.assertTrue(ri.ns_name().endswith(id)) @@ -100,7 +100,7 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() network_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) cidr = '99.0.1.9/24' mac = 'ca:fe:de:ad:be:ef' @@ -128,7 +128,7 @@ class TestBasicRouterOperations(testtools.TestCase): def _test_external_gateway_action(self, action): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16'] ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', @@ -172,7 +172,7 @@ class TestBasicRouterOperations(testtools.TestCase): def _test_floating_ip_action(self, action): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) floating_ip = '20.0.0.100' fixed_ip = '10.0.0.23' @@ -227,7 +227,8 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, + None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) fake_route1 = {'destination': '135.207.0.0/16', @@ -274,7 +275,8 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, + None) ri.router = {} fake_old_routes = [] diff --git a/quantum/tests/unit/test_l3_plugin.py b/quantum/tests/unit/test_l3_plugin.py index 3e0e7b0d50..0eea653045 100644 --- a/quantum/tests/unit/test_l3_plugin.py +++ b/quantum/tests/unit/test_l3_plugin.py @@ -29,6 +29,7 @@ from webob import exc import webtest from quantum.api import extensions +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import config from quantum.common import constants as l3_constants @@ -37,7 +38,6 @@ from quantum.common.test_lib import test_config from quantum import context from quantum.db import db_base_plugin_v2 from quantum.db import l3_db -from quantum.db import l3_rpc_agent_api from quantum.db import models_v2 from quantum.extensions import l3 from quantum.manager import QuantumManager @@ -307,24 +307,7 @@ class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2, return super(TestL3NatPlugin, self).delete_port(context, id) -class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): - - def setUp(self): - test_config['plugin_name_v2'] = ( - 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin') - # for these tests we need to enable overlapping ips - cfg.CONF.set_default('allow_overlapping_ips', True) - ext_mgr = L3TestExtensionManager() - test_config['extension_manager'] = ext_mgr - super(L3NatTestCaseBase, self).setUp() - - # Set to None to reload the drivers - notifier_api._drivers = None - cfg.CONF.set_override("notification_driver", [test_notifier.__name__]) - - def tearDown(self): - test_notifier.NOTIFICATIONS = [] - super(L3NatTestCaseBase, self).tearDown() +class L3NatTestCaseMixin(object): def _create_network(self, fmt, name, admin_state_up, **kwargs): """ Override the routine for allowing the router:external attribute """ @@ -334,7 +317,7 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): kwargs), kwargs.values())) arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,) - return super(L3NatTestCaseBase, self)._create_network( + return super(L3NatTestCaseMixin, self)._create_network( fmt, name, admin_state_up, arg_list=arg_list, **new_args) def _create_router(self, fmt, tenant_id, name=None, @@ -505,6 +488,27 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): public_sub['subnet']['network_id']) +class L3NatTestCaseBase(L3NatTestCaseMixin, + test_db_plugin.QuantumDbPluginV2TestCase): + + def setUp(self): + test_config['plugin_name_v2'] = ( + 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin') + # for these tests we need to enable overlapping ips + cfg.CONF.set_default('allow_overlapping_ips', True) + ext_mgr = L3TestExtensionManager() + test_config['extension_manager'] = ext_mgr + super(L3NatTestCaseBase, self).setUp() + + # Set to None to reload the drivers + notifier_api._drivers = None + cfg.CONF.set_override("notification_driver", [test_notifier.__name__]) + + def tearDown(self): + test_notifier.NOTIFICATIONS = [] + super(L3NatTestCaseBase, self).tearDown() + + class L3NatDBTestCase(L3NatTestCaseBase): def test_router_create(self): @@ -1459,7 +1463,7 @@ class L3NatDBTestCase(L3NatTestCaseBase): def _test_notify_op_agent(self, target_func, *args): l3_rpc_agent_api_str = ( - 'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI') + 'quantum.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI') oldNotify = l3_rpc_agent_api.L3AgentNotify try: with mock.patch(l3_rpc_agent_api_str) as notifyApi: diff --git a/tox.ini b/tox.ini index 45261b2f0b..3b4fadeafd 100644 --- a/tox.ini +++ b/tox.ini @@ -19,8 +19,12 @@ sitepackages = True downloadcache = ~/cache/pip [testenv:pep8] +# E712 comparison to False should be 'if cond is False:' or 'if not cond:' +# query = query.filter(Component.disabled == False) +# E125 continuation line does not distinguish itself from next logical line + commands = - pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,openstack,*egg . + pep8 --repeat --show-source --ignore=E125,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg . pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin [testenv:i18n]