Agent management extension

1/3 part of blueprint quantum-scheduler

This patch adds agent management support
to l3-agent and plugin-agent (ovs and linuxbridge).

Change-Id: Iebc272f45c7530c995f32ef3729b11cd76779385
This commit is contained in:
gongysh 2013-02-01 20:30:12 +08:00
parent cc72d0ad11
commit 881884844d
19 changed files with 766 additions and 28 deletions

View File

@ -53,5 +53,10 @@
"create_qos_queue:": "rule:admin_only",
"get_qos_queue:": "rule:admin_only",
"get_qos_queues:": "rule:admin_only"
"get_qos_queues:": "rule:admin_only",
"update_agent": "rule:admin_only",
"delete_agent": "rule:admin_only",
"get_agent": "rule:admin_only",
"get_agents": "rule:admin_only"
}

View File

@ -190,6 +190,11 @@ notification_topics = notifications
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
# =========== items for agent management extension =============
# Seconds to regard the agent as down.
# agent_down_time = 5
# =========== end of items for agent management extension =====
[DEFAULT_SERVICETYPE]
# Description of the default service type (optional)
# description = "default service type"
@ -208,6 +213,13 @@ notification_topics = notifications
# Change to "sudo" to skip the filtering and just run the comand directly
# root_helper = sudo
# =========== items for agent management extension =============
# seconds between nodes reporting state to server, should be less than
# agent_down_time
# report_interval = 4
# =========== end of items for agent management extension =====
[keystone_authtoken]
auth_host = 127.0.0.1
auth_port = 35357

View File

@ -29,6 +29,11 @@ ROOT_HELPER_OPTS = [
help=_('Root helper application.')),
]
AGENT_STATE_OPTS = [
cfg.IntOpt('report_interval', default=4,
help=_('Seconds between nodes reporting state to server')),
]
def register_root_helper(conf):
# The first call is to ensure backward compatibility
@ -36,6 +41,10 @@ def register_root_helper(conf):
conf.register_opts(ROOT_HELPER_OPTS, 'AGENT')
def register_agent_state_opts_helper(conf):
conf.register_opts(AGENT_STATE_OPTS, 'AGENT')
def get_root_helper(conf):
root_helper = conf.AGENT.root_helper
if root_helper is not 'sudo':

View File

@ -32,12 +32,14 @@ from quantum.agent.linux import interface
from quantum.agent.linux import ip_lib
from quantum.agent.linux import iptables_manager
from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.common import constants as l3_constants
from quantum.common import topics
from quantum import context
from quantum import manager
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common import periodic_task
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common.rpc import proxy
@ -139,7 +141,7 @@ class L3NATAgent(manager.Manager):
help=_("UUID of external network for routers implemented "
"by the agents.")),
cfg.StrOpt('l3_agent_manager',
default='quantum.agent.l3_agent.L3NATAgent',
default='quantum.agent.l3_agent.L3NATAgentWithStateReport',
help=_("The Quantum L3 Agent manager.")),
]
@ -161,6 +163,7 @@ class L3NATAgent(manager.Manager):
LOG.exception(_("Error importing interface driver '%s'"),
self.conf.interface_driver)
sys.exit(1)
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.fullsync = True
self.sync_sem = semaphore.Semaphore(1)
@ -211,8 +214,7 @@ class L3NATAgent(manager.Manager):
if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id
try:
return self.plugin_rpc.get_external_network_id(
context.get_admin_context())
return self.plugin_rpc.get_external_network_id(self.context)
except rpc_common.RemoteError as e:
if e.exc_type == 'TooManyExternalNetworks':
msg = _(
@ -617,15 +619,69 @@ class L3NATAgent(manager.Manager):
LOG.info(_("L3 agent started"))
class L3NATAgentWithStateReport(L3NATAgent):
def __init__(self, host, conf=None):
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.agent_state = {
'binary': 'quantum-l3-agent',
'host': host,
'topic': topics.L3_AGENT,
'configurations': {
'use_namespaces': self.conf.use_namespaces,
'router_id': self.conf.router_id,
'handle_internal_only_routers':
self.conf.handle_internal_only_routers,
'gateway_external_network_id':
self.conf.gateway_external_network_id,
'interface_driver': self.conf.interface_driver},
'start_flag': True,
'agent_type': l3_constants.AGENT_TYPE_L3}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.LoopingCall(self._report_state)
heartbeat.start(interval=report_interval)
def _report_state(self):
num_ex_gw_ports = 0
num_interfaces = 0
num_floating_ips = 0
router_infos = self.router_info.values()
num_routers = len(router_infos)
for ri in router_infos:
ex_gw_port = self._get_ex_gw_port(ri)
if ex_gw_port:
num_ex_gw_ports += 1
num_interfaces += len(ri.router.get(l3_constants.INTERFACE_KEY,
[]))
num_floating_ips += len(ri.router.get(l3_constants.FLOATINGIP_KEY,
[]))
configurations = self.agent_state['configurations']
configurations['routers'] = num_routers
configurations['ex_gw_ports'] = num_ex_gw_ports
configurations['interfaces'] = num_interfaces
configurations['floating_ips'] = num_floating_ips
try:
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_("Failed reporting state!"))
def main():
eventlet.monkey_patch()
conf = cfg.CONF
conf.register_opts(L3NATAgent.OPTS)
config.register_agent_state_opts_helper(conf)
config.register_root_helper(conf)
conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS)
conf()
config.setup_logging(conf)
server = quantum_service.Service.create(binary='quantum-l3-agent',
topic=topics.L3_AGENT)
server = quantum_service.Service.create(
binary='quantum-l3-agent',
topic=topics.L3_AGENT,
report_interval=cfg.CONF.AGENT.report_interval)
service.launch(server).wait()

View File

@ -49,6 +49,21 @@ def create_consumers(dispatcher, prefix, topic_details):
return connection
class PluginReportStateAPI(proxy.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginReportStateAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def report_state(self, context, agent_state):
return self.cast(context,
self.make_msg('report_state',
agent_state={'agent_state':
agent_state}),
topic=self.topic)
class PluginApi(proxy.RpcProxy):
'''Agent side of the rpc API.

View File

@ -52,3 +52,9 @@ TYPE_LONG = "long"
TYPE_FLOAT = "float"
TYPE_LIST = "list"
TYPE_DICT = "dict"
AGENT_TYPE_DHCP = 'DHCP agent'
AGENT_TYPE_OVS = 'Open vSwitch agent'
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_L3 = 'L3 agent'
L2_AGENT_TOPIC = 'N/A'

157
quantum/db/agents_db.py Normal file
View File

@ -0,0 +1,157 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy.orm import exc
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import agent as ext_agent
from quantum import manager
from quantum.openstack.common import cfg
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
cfg.CONF.register_opt(
cfg.IntOpt('agent_down_time', default=5,
help=_("Seconds to regard the agent is down.")))
class Agent(model_base.BASEV2, models_v2.HasId):
"""Represents agents running in quantum deployments"""
# L3 agent, DHCP agent, OVS agent, LinuxBridge
agent_type = sa.Column(sa.String(255), nullable=False)
binary = sa.Column(sa.String(255), nullable=False)
# TOPIC is a fanout exchange topic
topic = sa.Column(sa.String(255), nullable=False)
# TOPIC.host is a target topic
host = sa.Column(sa.String(255), nullable=False)
admin_state_up = sa.Column(sa.Boolean, default=True,
nullable=False)
# the time when first report came from agents
created_at = sa.Column(sa.DateTime, nullable=False)
# the time when first report came after agents start
started_at = sa.Column(sa.DateTime, nullable=False)
# updated when agents report
heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False)
# description is note for admin user
description = sa.Column(sa.String(255))
# configurations: a json dict string, I think 4095 is enough
configurations = sa.Column(sa.String(4095), nullable=False)
class AgentDbMixin(ext_agent.AgentPluginBase):
"""Mixin class to add agent extension to db_plugin_base_v2."""
def _get_agent(self, context, id):
try:
agent = self._get_by_id(context, Agent, id)
except exc.NoResultFound:
raise ext_agent.AgentNotFound(id=id)
return agent
def _is_agent_down(self, heart_beat_time_str):
return timeutils.is_older_than(heart_beat_time_str,
cfg.CONF.agent_down_time)
def _make_agent_dict(self, agent, fields=None):
attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
ext_agent.RESOURCE_NAME + 's')
res = dict((k, agent[k]) for k in attr
if k not in ['alive', 'configurations'])
res['alive'] = not self._is_agent_down(res['heartbeat_timestamp'])
try:
res['configurations'] = jsonutils.loads(agent['configurations'])
except Exception:
msg = _('Configurations for agent %(agent_type)s on host %(host)s'
' are invalid.')
LOG.warn(msg, {'agent_type': res['agent_type'],
'host': res['host']})
res['configurations'] = {}
return self._fields(res, fields)
def delete_agent(self, context, id):
with context.session.begin(subtransactions=True):
agent = self._get_agent(context, id)
context.session.delete(agent)
def update_agent(self, context, id, agent):
agent_data = agent['agent']
with context.session.begin(subtransactions=True):
agent = self._get_agent(context, id)
agent.update(agent_data)
return self._make_agent_dict(agent)
def get_agents(self, context, filters=None, fields=None):
return self._get_collection(context, Agent,
self._make_agent_dict,
filters=filters, fields=fields)
def _get_agent_by_type_and_host(self, context, agent_type, host):
query = self._model_query(context, Agent)
try:
agent_db = query.filter(Agent.agent_type == agent_type,
Agent.host == host).one()
return agent_db
except exc.NoResultFound:
raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type,
host=host)
except exc.MultipleResultsFound:
raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type,
host=host)
def get_agent(self, context, id, fields=None):
agent = self._get_agent(context, id)
return self._make_agent_dict(agent, fields)
def create_or_update_agent(self, context, agent):
"""Create or update agent according to report."""
with context.session.begin(subtransactions=True):
res_keys = ['agent_type', 'binary', 'host', 'topic']
res = dict((k, agent[k]) for k in res_keys)
configurations_dict = agent.get('configurations', {})
res['configurations'] = jsonutils.dumps(configurations_dict)
current_time = timeutils.utcnow()
try:
agent_db = self._get_agent_by_type_and_host(
context, agent['agent_type'], agent['host'])
res['heartbeat_timestamp'] = current_time
if agent.get('start_flag'):
res['started_at'] = current_time
agent_db.update(res)
except ext_agent.AgentNotFoundByTypeHost:
res['created_at'] = current_time
res['started_at'] = current_time
res['heartbeat_timestamp'] = current_time
res['admin_state_up'] = True
agent_db = Agent(**res)
context.session.add(agent_db)
class AgentExtRpcCallback(object):
"""Processes the rpc report in plugin implementations."""
RPC_API_VERSION = '1.0'
def report_state(self, context, **kwargs):
"""Report state from agent to server. """
agent_state = kwargs['agent_state']['agent_state']
plugin = manager.QuantumManager.get_plugin()
plugin.create_or_update_agent(context, agent_state)

View File

@ -0,0 +1,73 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Add agent management extension model support
Revision ID: 511471cc46b
Revises: 54c2c487e913
Create Date: 2013-02-18 05:09:32.523460
"""
# revision identifiers, used by Alembic.
revision = '511471cc46b'
down_revision = '54c2c487e913'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
]
from alembic import op
import sqlalchemy as sa
from quantum.db import migration
def upgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
### commands auto generated by Alembic - please adjust! ###
op.create_table(
'agents',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('agent_type', sa.String(length=255), nullable=False),
sa.Column('binary', sa.String(length=255), nullable=False),
sa.Column('topic', sa.String(length=255), nullable=False),
sa.Column('host', sa.String(length=255), nullable=False),
sa.Column('admin_state_up', sa.Boolean(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('heartbeat_timestamp', sa.DateTime(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('configurations', sa.String(length=4095), nullable=False),
sa.PrimaryKeyConstraint('id')
)
### end Alembic commands ###
def downgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
### commands auto generated by Alembic - please adjust! ###
op.drop_table('agents')
### end Alembic commands ###

161
quantum/extensions/agent.py Normal file
View File

@ -0,0 +1,161 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
from quantum.api import extensions
from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
from quantum.common import exceptions
from quantum import manager
# Attribute Map
RESOURCE_NAME = 'agent'
RESOURCE_ATTRIBUTE_MAP = {
RESOURCE_NAME + 's': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'agent_type': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'binary': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'topic': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'host': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'admin_state_up': {'allow_post': False, 'allow_put': True,
'convert_to': attr.convert_to_boolean,
'is_visible': True},
'created_at': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'started_at': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'heartbeat_timestamp': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'alive': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'configurations': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'description': {'allow_post': False, 'allow_put': True,
'is_visible': True,
'validate': {'type:string': None}},
},
}
class AgentNotFound(exceptions.NotFound):
message = _("Agent %(id)s could not be found")
class AgentNotFoundByTypeHost(exceptions.NotFound):
message = _("Agent with agent_type=%(agent_type)s and host=%(host)s "
"could not be found")
class MultipleAgentFoundByTypeHost(exceptions.Conflict):
message = _("Multiple agents with agent_type=%(agent_type)s and "
"host=%(host)s found")
class Agent(object):
"""Agent management extension"""
@classmethod
def get_name(cls):
return "agent"
@classmethod
def get_alias(cls):
return "agent"
@classmethod
def get_description(cls):
return "The agent management extension."
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/agent/api/v2.0"
@classmethod
def get_updated(cls):
return "2013-02-03T10:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Ext Resources """
my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
attr.PLURALS.update(dict(my_plurals))
plugin = manager.QuantumManager.get_plugin()
params = RESOURCE_ATTRIBUTE_MAP.get(RESOURCE_NAME + 's')
controller = base.create_resource(RESOURCE_NAME + 's',
RESOURCE_NAME,
plugin, params
)
ex = extensions.ResourceExtension(RESOURCE_NAME + 's',
controller)
return [ex]
def get_extended_resources(self, version):
return {}
class AgentPluginBase(object):
""" REST API to operate the Agent.
All of method must be in an admin context.
"""
def create_agent(self, context, agent):
""" Create agent.
This operation is not allow in REST API.
@raise exceptions.BadRequest:
"""
raise exceptions.BadRequest
@abstractmethod
def delete_agent(self, context, id):
"""Delete agent.
Agents register themselves on reporting state.
But if a agent does not report its status
for a long time (for example, it is dead for ever. ),
admin can remove it. Agents must be disabled before
being removed.
"""
pass
@abstractmethod
def update_agent(self, context, agent):
"""Disable or Enable the agent.
Discription also can be updated.
Some agents cannot be disabled,
such as plugins, services.
An error code should be reported in this case.
@raise exceptions.BadRequest:
"""
pass
@abstractmethod
def get_agents(self, context, filters=None, fields=None):
pass
@abstractmethod
def get_agent(self, context, id, fields=None):
pass

View File

@ -35,10 +35,12 @@ from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
from quantum.common import constants
from quantum.common import topics
from quantum.common import utils as q_utils
from quantum import context
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.linuxbridge.common import config
@ -463,9 +465,27 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
self.polling_interval = polling_interval
self.root_helper = root_helper
self.setup_linux_bridge(interface_mappings)
self.agent_state = {
'binary': 'quantum-linuxbridge-agent',
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': interface_mappings,
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
self.setup_rpc(interface_mappings.values())
self.init_firewall()
def _report_state(self):
try:
devices = len(self.br_mgr.udev_get_tap_devices())
self.agent_state.get('configurations')['devices'] = devices
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception("Failed reporting state!")
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
mac = utils.get_interface_mac(physical_interfaces[0])
@ -482,7 +502,7 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
self.topic = topics.AGENT
self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
@ -496,6 +516,10 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.LoopingCall(self._report_state)
heartbeat.start(interval=report_interval)
def setup_linux_bridge(self, interface_mappings):
self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper)

View File

@ -51,4 +51,5 @@ agent_opts = [
cfg.CONF.register_opts(vlan_opts, "VLANS")
cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE")
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)

View File

@ -24,6 +24,7 @@ from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.common import utils
from quantum.db import agents_db
from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
@ -48,12 +49,13 @@ LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
):
RPC_API_VERSION = '1.1'
# Device names start with "tap"
# history
# 1.1 Support Security Group RPC
RPC_API_VERSION = '1.1'
# Device names start with "tap"
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
@ -62,7 +64,8 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self])
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
@classmethod
def get_port_from_device(cls, device):
@ -170,7 +173,8 @@ class AgentNotifierApi(proxy.RpcProxy,
class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
sg_db_rpc.SecurityGroupServerRpcMixin,
agents_db.AgentDbMixin):
"""Implement the Quantum abstractions using Linux bridging.
A new VLAN is created for each network. An agent is relied upon
@ -193,7 +197,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
__native_bulk_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas",
"security-group"]
"security-group", "agent"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"

View File

@ -32,10 +32,12 @@ from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
from quantum.common import constants as q_const
from quantum.common import topics
from quantum.common import utils as q_utils
from quantum import context
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
from quantum.extensions import securitygroup as ext_sg
@ -176,7 +178,13 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self.tunnel_count = 0
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
self.agent_state = {
'binary': 'quantum-openvswitch-agent',
'host': cfg.CONF.host,
'topic': q_const.L2_AGENT_TOPIC,
'configurations': bridge_mappings,
'agent_type': q_const.AGENT_TYPE_OVS,
'start_flag': True}
self.setup_rpc(integ_br)
# Security group agent supprot
@ -184,11 +192,24 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self.plugin_rpc,
root_helper)
def _report_state(self):
try:
# How many devices are likely used by a VM
ports = self.int_br.get_vif_port_set()
num_devices = len(ports)
self.agent_state.get('configurations')['devices'] = num_devices
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_("Failed reporting state!"))
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
@ -202,6 +223,10 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.LoopingCall(self._report_state)
heartbeat.start(interval=report_interval)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():

View File

@ -62,4 +62,5 @@ agent_opts = [
cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)

View File

@ -30,6 +30,7 @@ from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import agents_db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
@ -71,7 +72,8 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self])
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
@classmethod
def get_port_from_device(cls, device):
@ -208,7 +210,9 @@ class AgentNotifierApi(proxy.RpcProxy,
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
sg_db_rpc.SecurityGroupServerRpcMixin,
agents_db.AgentDbMixin):
"""Implement the Quantum abstractions using Open vSwitch.
Depending on whether tunneling is enabled, either a GRE tunnel or
@ -231,7 +235,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# is qualified by class
__native_bulk_support = True
supported_extension_aliases = ["provider", "router",
"binding", "quotas", "security-group"]
"binding", "quotas", "security-group",
"agent"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"

View File

@ -34,9 +34,6 @@ from quantum import wsgi
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help=_('Seconds between nodes reporting state to datastore')),
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),

View File

@ -50,6 +50,7 @@ class TestOvsQuantumAgent(unittest.TestCase):
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
'OVSQuantumAgent.setup_integration_br',
@ -160,14 +161,19 @@ class TestOvsQuantumAgent(unittest.TestCase):
'admin_state_up': True}
with mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value='2'):
with mock.patch.object(self.agent.plugin_rpc,
'update_device_up') as device_up:
with mock.patch.object(self.agent, 'port_bound') as port_bound:
self.agent.port_update(mock.Mock(), port=port)
self.assertTrue(port_bound.called)
self.assertTrue(device_up.called)
with mock.patch.object(self.agent.plugin_rpc,
'update_device_down') as device_down:
with mock.patch.object(self.agent, 'port_dead') as port_dead:
port['admin_state_up'] = False
self.agent.port_update(mock.Mock(), port=port)
self.assertTrue(port_dead.called)
self.assertTrue(device_down.called)
def test_process_network_ports(self):
reply = {'current': set(['tap0']),

View File

@ -64,6 +64,7 @@ class TunnelTest(unittest.TestCase):
def setUp(self):
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
self.mox = mox.Mox()
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'

View File

@ -0,0 +1,180 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import time
from webob import exc
from quantum.common import constants
from quantum.common import topics
from quantum.common.test_lib import test_config
from quantum import context
from quantum.db import agents_db
from quantum.db import db_base_plugin_v2
from quantum.extensions import agent
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_db_plugin
LOG = logging.getLogger(__name__)
_uuid = uuidutils.generate_uuid
_get_path = test_api_v2._get_path
L3_HOSTA = 'hosta'
DHCP_HOSTA = 'hosta'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
class AgentTestExtensionManager(object):
def get_resources(self):
return agent.Agent.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
# This plugin class is just for testing
class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2,
agents_db.AgentDbMixin):
supported_extension_aliases = ["agent"]
class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
fmt = 'json'
def setUp(self):
self.adminContext = context.get_admin_context()
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = AgentTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(AgentDBTestCase, self).setUp()
def _list_agents(self, expected_res_status=None,
quantum_context=None,
query_string=None):
comp_res = self._list('agents',
quantum_context=quantum_context,
query_params=query_string)
if expected_res_status:
self.assertEqual(comp_res.status_int, expected_res_status)
return comp_res
def _register_agent_states(self):
"""Register two L3 agents and two DHCP agents."""
l3_hosta = {
'binary': 'quantum-l3-agent',
'host': L3_HOSTA,
'topic': topics.L3_AGENT,
'configurations': {'use_namespaces': True,
'router_id': None,
'handle_internal_only_routers':
True,
'gateway_external_network_id':
None,
'interface_driver': 'interface_driver',
},
'agent_type': constants.AGENT_TYPE_L3}
l3_hostb = copy.deepcopy(l3_hosta)
l3_hostb['host'] = L3_HOSTB
dhcp_hosta = {
'binary': 'quantum-dhcp-agent',
'host': DHCP_HOSTA,
'topic': 'DHCP_AGENT',
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True,
},
'agent_type': constants.AGENT_TYPE_DHCP}
dhcp_hostc = copy.deepcopy(dhcp_hosta)
dhcp_hostc['host'] = DHCP_HOSTC
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': l3_hosta})
callback.report_state(self.adminContext,
agent_state={'agent_state': l3_hostb})
callback.report_state(self.adminContext,
agent_state={'agent_state': dhcp_hosta})
callback.report_state(self.adminContext,
agent_state={'agent_state': dhcp_hostc})
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
def test_create_agent(self):
data = {'agent': {}}
_req = self.new_create_request('agents', data, self.fmt)
_req.environ['quantum.context'] = context.Context(
'', 'tenant_id')
res = _req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_list_agent(self):
agents = self._register_agent_states()
res = self._list('agents')
for agent in res['agents']:
if (agent['host'] == DHCP_HOSTA and
agent['agent_type'] == constants.AGENT_TYPE_DHCP):
self.assertEqual(
'dhcp_driver',
agent['configurations']['dhcp_driver'])
break
self.assertEqual(len(agents), len(res['agents']))
def test_show_agent(self):
self._register_agent_states()
agents = self._list_agents(
query_string='binary=quantum-l3-agent')
self.assertEqual(2, len(agents['agents']))
agent = self._show('agents', agents['agents'][0]['id'])
self.assertEqual('quantum-l3-agent', agent['agent']['binary'])
def test_update_agent(self):
self._register_agent_states()
agents = self._list_agents(
query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
self.assertEqual(1, len(agents['agents']))
com_id = agents['agents'][0]['id']
agent = self._show('agents', com_id)
new_agent = {}
new_agent['agent'] = {}
new_agent['agent']['admin_state_up'] = False
new_agent['agent']['description'] = 'description'
self._update('agents', com_id, new_agent)
agent = self._show('agents', com_id)
self.assertFalse(agent['agent']['admin_state_up'])
self.assertEqual('description', agent['agent']['description'])
def test_dead_agent(self):
cfg.CONF.set_override('agent_down_time', 1)
self._register_agent_states()
time.sleep(1.5)
agents = self._list_agents(
query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
self.assertFalse(agents['agents'][0]['alive'])
class AgentDBTestCaseXML(AgentDBTestCase):
fmt = 'xml'