Modify dhcp agent for agent management extension
2nd part of blueprint quantum-scheduler Remove openstack openstack listener on DHCP agent side. Add DHCPagent notifier on quantum server side. Change-Id: I196691650a99ba865bf06081a1fc4546f9fac7bd
This commit is contained in:
parent
4104dc004a
commit
4126d593e7
@ -36,6 +36,7 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
|
||||
# be activated when the subnet gateway_ip is None. The guest instance must
|
||||
# be configured to request host routes via DHCP (Option 121).
|
||||
# enable_isolated_metadata = False
|
||||
|
||||
# Allows for serving metadata requests coming from a dedicated metadata
|
||||
# access network whose cidr is 169.254.169.254/16 (or larger prefix), and
|
||||
# is connected to a Quantum router from which the VMs send metadata
|
||||
@ -43,3 +44,6 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
|
||||
# they will be able to reach 169.254.169.254 through a router.
|
||||
# This option requires enable_isolated_metadata = True
|
||||
# enable_metadata_network = False
|
||||
|
||||
# The Quantum DHCP agent manager.
|
||||
# dhcp_agent_manager = quantum.agent.dhcp_agent.DhcpAgentWithStateReport
|
||||
|
@ -64,6 +64,9 @@ api_paste_config = api-paste.ini
|
||||
# DHCP Lease duration (in seconds)
|
||||
# dhcp_lease_duration = 120
|
||||
|
||||
# Allow sending resource operation notification to DHCP agent
|
||||
# dhcp_agent_notification = True
|
||||
|
||||
# Enable or disable bulk create/update/delete operations
|
||||
# allow_bulk = True
|
||||
# Enable or disable overlapping IPs for subnets
|
||||
|
@ -33,11 +33,15 @@ from quantum.common import constants
|
||||
from quantum.common import exceptions
|
||||
from quantum.common import topics
|
||||
from quantum import context
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import jsonutils
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import loopingcall
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
from quantum.openstack.common import service
|
||||
from quantum.openstack.common import uuidutils
|
||||
from quantum import service as quantum_service
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NS_PREFIX = 'qdhcp-'
|
||||
@ -46,7 +50,7 @@ METADATA_DEFAULT_IP = '169.254.169.254/%d' % METADATA_DEFAULT_PREFIX
|
||||
METADATA_PORT = 80
|
||||
|
||||
|
||||
class DhcpAgent(object):
|
||||
class DhcpAgent(manager.Manager):
|
||||
OPTS = [
|
||||
cfg.IntOpt('resync_interval', default=5,
|
||||
help=_("Interval to resync.")),
|
||||
@ -60,29 +64,34 @@ class DhcpAgent(object):
|
||||
cfg.BoolOpt('enable_metadata_network', default=False,
|
||||
help=_("Allows for serving metadata requests from a "
|
||||
"dedicate network. Requires "
|
||||
"enable isolated_metadata = True "))
|
||||
"enable isolated_metadata = True ")),
|
||||
cfg.StrOpt('dhcp_agent_manager',
|
||||
default='quantum.agent.dhcp_agent.'
|
||||
'DhcpAgentWithStateReport',
|
||||
help=_("The Quantum DHCP agent manager.")),
|
||||
]
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, host=None):
|
||||
super(DhcpAgent, self).__init__(host=host)
|
||||
self.needs_resync = False
|
||||
self.conf = conf
|
||||
self.conf = cfg.CONF
|
||||
self.cache = NetworkCache()
|
||||
self.root_helper = config.get_root_helper(conf)
|
||||
|
||||
self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
|
||||
self.root_helper = config.get_root_helper(self.conf)
|
||||
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
|
||||
|
||||
self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
|
||||
self.notifications = agent_rpc.NotificationDispatcher()
|
||||
self.lease_relay = DhcpLeaseRelay(self.update_lease)
|
||||
|
||||
def after_start(self):
|
||||
self.run()
|
||||
LOG.info(_("DHCP agent started"))
|
||||
|
||||
def run(self):
|
||||
"""Activate the DHCP agent."""
|
||||
self.sync_state()
|
||||
self.periodic_resync()
|
||||
self.lease_relay.start()
|
||||
self.notifications.run_dispatch(self)
|
||||
|
||||
def _ns_name(self, network):
|
||||
if self.conf.use_namespaces:
|
||||
@ -199,12 +208,12 @@ class DhcpAgent(object):
|
||||
else:
|
||||
self.disable_dhcp_helper(network.id)
|
||||
|
||||
def network_create_end(self, payload):
|
||||
def network_create_end(self, context, payload):
|
||||
"""Handle the network.create.end notification event."""
|
||||
network_id = payload['network']['id']
|
||||
self.enable_dhcp_helper(network_id)
|
||||
|
||||
def network_update_end(self, payload):
|
||||
def network_update_end(self, context, payload):
|
||||
"""Handle the network.update.end notification event."""
|
||||
network_id = payload['network']['id']
|
||||
if payload['network']['admin_state_up']:
|
||||
@ -212,11 +221,11 @@ class DhcpAgent(object):
|
||||
else:
|
||||
self.disable_dhcp_helper(network_id)
|
||||
|
||||
def network_delete_end(self, payload):
|
||||
def network_delete_end(self, context, payload):
|
||||
"""Handle the network.delete.end notification event."""
|
||||
self.disable_dhcp_helper(payload['network_id'])
|
||||
|
||||
def subnet_update_end(self, payload):
|
||||
def subnet_update_end(self, context, payload):
|
||||
"""Handle the subnet.update.end notification event."""
|
||||
network_id = payload['subnet']['network_id']
|
||||
self.refresh_dhcp_helper(network_id)
|
||||
@ -224,14 +233,14 @@ class DhcpAgent(object):
|
||||
# Use the update handler for the subnet create event.
|
||||
subnet_create_end = subnet_update_end
|
||||
|
||||
def subnet_delete_end(self, payload):
|
||||
def subnet_delete_end(self, context, payload):
|
||||
"""Handle the subnet.delete.end notification event."""
|
||||
subnet_id = payload['subnet_id']
|
||||
network = self.cache.get_network_by_subnet_id(subnet_id)
|
||||
if network:
|
||||
self.refresh_dhcp_helper(network.id)
|
||||
|
||||
def port_update_end(self, payload):
|
||||
def port_update_end(self, context, payload):
|
||||
"""Handle the port.update.end notification event."""
|
||||
port = DictModel(payload['port'])
|
||||
network = self.cache.get_network_by_id(port.network_id)
|
||||
@ -242,7 +251,7 @@ class DhcpAgent(object):
|
||||
# Use the update handler for the port create event.
|
||||
port_create_end = port_update_end
|
||||
|
||||
def port_delete_end(self, payload):
|
||||
def port_delete_end(self, context, payload):
|
||||
"""Handle the port.delete.end notification event."""
|
||||
port = self.cache.get_port_by_id(payload['port_id'])
|
||||
if port:
|
||||
@ -434,6 +443,19 @@ class NetworkCache(object):
|
||||
if port.id == port_id:
|
||||
return port
|
||||
|
||||
def get_state(self):
|
||||
net_ids = self.get_network_ids()
|
||||
num_nets = len(net_ids)
|
||||
num_subnets = 0
|
||||
num_ports = 0
|
||||
for net_id in net_ids:
|
||||
network = self.get_network_by_id(net_id)
|
||||
num_subnets += len(network.subnets)
|
||||
num_ports += len(network.ports)
|
||||
return {'networks': num_nets,
|
||||
'subnets': num_subnets,
|
||||
'ports': num_ports}
|
||||
|
||||
|
||||
class DeviceManager(object):
|
||||
OPTS = [
|
||||
@ -626,9 +648,46 @@ class DhcpLeaseRelay(object):
|
||||
eventlet.spawn(eventlet.serve, listener, self._handler)
|
||||
|
||||
|
||||
class DhcpAgentWithStateReport(DhcpAgent):
|
||||
def __init__(self, host=None):
|
||||
super(DhcpAgentWithStateReport, self).__init__(host=host)
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
|
||||
self.agent_state = {
|
||||
'binary': 'quantum-dhcp-agent',
|
||||
'host': host,
|
||||
'topic': topics.DHCP_AGENT,
|
||||
'configurations': {
|
||||
'dhcp_driver': cfg.CONF.dhcp_driver,
|
||||
'use_namespaces': cfg.CONF.use_namespaces,
|
||||
'dhcp_lease_time': cfg.CONF.dhcp_lease_time},
|
||||
'start_flag': True,
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
if report_interval:
|
||||
heartbeat = loopingcall.LoopingCall(self._report_state)
|
||||
heartbeat.start(interval=report_interval)
|
||||
|
||||
def _report_state(self):
|
||||
try:
|
||||
self.agent_state.get('configurations').update(
|
||||
self.cache.get_state())
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.state_rpc.report_state(ctx,
|
||||
self.agent_state)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed reporting state!"))
|
||||
return
|
||||
if self.agent_state.pop('start_flag', None):
|
||||
self.run()
|
||||
|
||||
def after_start(self):
|
||||
LOG.info(_("DHCP agent started"))
|
||||
|
||||
|
||||
def main():
|
||||
eventlet.monkey_patch()
|
||||
cfg.CONF.register_opts(DhcpAgent.OPTS)
|
||||
config.register_agent_state_opts_helper(cfg.CONF)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
cfg.CONF.register_opts(DeviceManager.OPTS)
|
||||
cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
|
||||
@ -636,6 +695,8 @@ def main():
|
||||
cfg.CONF.register_opts(interface.OPTS)
|
||||
cfg.CONF(project='quantum')
|
||||
config.setup_logging(cfg.CONF)
|
||||
|
||||
mgr = DhcpAgent(cfg.CONF)
|
||||
mgr.run()
|
||||
server = quantum_service.Service.create(
|
||||
binary='quantum-dhcp-agent',
|
||||
topic=topics.DHCP_AGENT,
|
||||
report_interval=cfg.CONF.AGENT.report_interval)
|
||||
service.launch(server).wait()
|
||||
|
@ -100,34 +100,3 @@ class PluginApi(proxy.RpcProxy):
|
||||
return self.call(context,
|
||||
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
|
||||
topic=self.topic)
|
||||
|
||||
|
||||
class NotificationDispatcher(object):
|
||||
def __init__(self):
|
||||
# Set the Queue size to 1 so that messages stay on server rather than
|
||||
# being buffered in the process.
|
||||
self.queue = eventlet.queue.Queue(1)
|
||||
self.connection = rpc.create_connection(new=True)
|
||||
topic = '%s.%s' % (rpc_notifier.CONF.notification_topics[0],
|
||||
api.CONF.default_notification_level.lower())
|
||||
queue_name = 'notification_listener_%s' % uuidutils.generate_uuid()
|
||||
self.connection.declare_topic_consumer(topic=topic,
|
||||
queue_name=queue_name,
|
||||
callback=self._add_to_queue)
|
||||
self.connection.consume_in_thread()
|
||||
|
||||
def _add_to_queue(self, msg):
|
||||
self.queue.put(msg)
|
||||
|
||||
def run_dispatch(self, handler):
|
||||
while True:
|
||||
msg = self.queue.get()
|
||||
name = msg['event_type'].replace('.', '_')
|
||||
|
||||
try:
|
||||
if hasattr(handler, name):
|
||||
getattr(handler, name)(msg['payload'])
|
||||
else:
|
||||
LOG.debug(_('Unknown event_type: %s.'), msg['event_type'])
|
||||
except Exception, e:
|
||||
LOG.warn(_('Error processing message. Exception: %s'), e)
|
||||
|
14
quantum/api/rpc/__init__.py
Normal file
14
quantum/api/rpc/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
14
quantum/api/rpc/agentnotifiers/__init__.py
Normal file
14
quantum/api/rpc/agentnotifiers/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
70
quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
Normal file
70
quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
Normal file
@ -0,0 +1,70 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import topics
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
"""API for plugin to notify DHCP agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
# It seems dhcp agent does not support bulk operation
|
||||
VALID_RESOURCES = ['network', 'subnet', 'port']
|
||||
VALID_METHOD_NAMES = ['network.create.end',
|
||||
'network.update.end',
|
||||
'network.delete.end',
|
||||
'subnet.create.end',
|
||||
'subnet.update.end',
|
||||
'subnet.delete.end',
|
||||
'port.create.end',
|
||||
'port.update.end',
|
||||
'port.delete.end']
|
||||
|
||||
def __init__(self, topic=topics.DHCP_AGENT):
|
||||
super(DhcpAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification(self, context, method, payload):
|
||||
"""Notify all the agents that are hosting the network"""
|
||||
# By now, we have no scheduling feature, so we fanout
|
||||
# to all of the DHCP agents
|
||||
self._notification_fanout(context, method, payload)
|
||||
|
||||
def _notification_fanout(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents"""
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic=topics.DHCP_AGENT)
|
||||
|
||||
def notify(self, context, data, methodname):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
if methodname not in self.VALID_METHOD_NAMES:
|
||||
return
|
||||
obj_type = data.keys()[0]
|
||||
if obj_type not in self.VALID_RESOURCES:
|
||||
return
|
||||
obj_value = data[obj_type]
|
||||
methodname = methodname.replace(".", "_")
|
||||
if methodname.endswith("_delete_end"):
|
||||
if 'id' in obj_value:
|
||||
self._notification(context, methodname,
|
||||
{obj_type + '_id': obj_value['id']})
|
||||
else:
|
||||
self._notification(context, methodname, data)
|
@ -18,6 +18,9 @@
|
||||
import netaddr
|
||||
import webob.exc
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from quantum.api.v2 import attributes
|
||||
from quantum.api.v2 import resource as wsgi_resource
|
||||
from quantum.common import exceptions
|
||||
@ -94,6 +97,7 @@ class Controller(object):
|
||||
self._policy_attrs = [name for (name, info) in self._attr_info.items()
|
||||
if info.get('required_by_policy')]
|
||||
self._publisher_id = notifier_api.publisher_id('network')
|
||||
self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self._member_actions = member_actions
|
||||
|
||||
if parent:
|
||||
@ -193,6 +197,10 @@ class Controller(object):
|
||||
policy.enforce(request.context, action, obj, plugin=self._plugin)
|
||||
return obj
|
||||
|
||||
def _send_dhcp_notification(self, context, data, methodname):
|
||||
if cfg.CONF.dhcp_agent_notification:
|
||||
self._dhcp_agent_notifier.notify(context, data, methodname)
|
||||
|
||||
def index(self, request, **kwargs):
|
||||
"""Returns a list of the requested entity"""
|
||||
parent_id = kwargs.get(self._parent_id_name)
|
||||
@ -298,11 +306,15 @@ class Controller(object):
|
||||
**kwargs)
|
||||
|
||||
def notify(create_result):
|
||||
notifier_method = self._resource + '.create.end'
|
||||
notifier_api.notify(request.context,
|
||||
self._publisher_id,
|
||||
self._resource + '.create.end',
|
||||
notifier_method,
|
||||
notifier_api.CONF.default_notification_level,
|
||||
create_result)
|
||||
self._send_dhcp_notification(request.context,
|
||||
create_result,
|
||||
notifier_method)
|
||||
return create_result
|
||||
|
||||
kwargs = {self._parent_id_name: parent_id} if parent_id else {}
|
||||
@ -348,11 +360,16 @@ class Controller(object):
|
||||
|
||||
obj_deleter = getattr(self._plugin, action)
|
||||
obj_deleter(request.context, id, **kwargs)
|
||||
notifier_method = self._resource + '.delete.end'
|
||||
notifier_api.notify(request.context,
|
||||
self._publisher_id,
|
||||
self._resource + '.delete.end',
|
||||
notifier_method,
|
||||
notifier_api.CONF.default_notification_level,
|
||||
{self._resource + '_id': id})
|
||||
result = {self._resource: self._view(obj)}
|
||||
self._send_dhcp_notification(request.context,
|
||||
result,
|
||||
notifier_method)
|
||||
|
||||
def update(self, request, id, body=None, **kwargs):
|
||||
"""Updates the specified entity's attributes"""
|
||||
@ -398,11 +415,15 @@ class Controller(object):
|
||||
kwargs[self._parent_id_name] = parent_id
|
||||
obj = obj_updater(request.context, id, **kwargs)
|
||||
result = {self._resource: self._view(obj)}
|
||||
notifier_method = self._resource + '.update.end'
|
||||
notifier_api.notify(request.context,
|
||||
self._publisher_id,
|
||||
self._resource + '.update.end',
|
||||
notifier_method,
|
||||
notifier_api.CONF.default_notification_level,
|
||||
result)
|
||||
self._send_dhcp_notification(request.context,
|
||||
result,
|
||||
notifier_method)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
|
@ -62,6 +62,9 @@ core_opts = [
|
||||
help=_("Maximum number of host routes per subnet")),
|
||||
cfg.IntOpt('dhcp_lease_duration', default=120,
|
||||
help=_("DHCP lease duration")),
|
||||
cfg.BoolOpt('dhcp_agent_notification', default=True,
|
||||
help=_("Allow sending resource operation"
|
||||
" notification to DHCP agent")),
|
||||
cfg.BoolOpt('allow_overlapping_ips', default=False,
|
||||
help=_("Allow overlapping IP support in Quantum")),
|
||||
cfg.StrOpt('host', default=utils.get_hostname(),
|
||||
|
@ -27,6 +27,7 @@ PLUGIN = 'q-plugin'
|
||||
DHCP = 'q-dhcp-notifer'
|
||||
|
||||
L3_AGENT = 'l3_agent'
|
||||
DHCP_AGENT = 'dhcp_agent'
|
||||
|
||||
|
||||
def get_topic_name(prefix, table, operation):
|
||||
|
@ -62,84 +62,3 @@ class AgentRPCMethods(unittest.TestCase):
|
||||
with mock.patch(call_to_patch) as create_connection:
|
||||
conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
|
||||
create_connection.assert_has_calls(expected)
|
||||
|
||||
|
||||
class AgentRPCNotificationDispatcher(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.create_connection_p = mock.patch(
|
||||
'quantum.openstack.common.rpc.create_connection')
|
||||
self.create_connection = self.create_connection_p.start()
|
||||
cfg.CONF.set_override('default_notification_level', 'INFO')
|
||||
cfg.CONF.set_override('notification_topics', ['notifications'])
|
||||
|
||||
def tearDown(self):
|
||||
self.create_connection_p.stop()
|
||||
cfg.CONF.reset()
|
||||
|
||||
def test_init(self):
|
||||
nd = rpc.NotificationDispatcher()
|
||||
|
||||
expected = [
|
||||
mock.call(new=True),
|
||||
mock.call().declare_topic_consumer(topic='notifications.info',
|
||||
queue_name=mock.ANY,
|
||||
callback=nd._add_to_queue),
|
||||
mock.call().consume_in_thread()
|
||||
]
|
||||
self.create_connection.assert_has_calls(expected)
|
||||
|
||||
def test_add_to_queue(self):
|
||||
nd = rpc.NotificationDispatcher()
|
||||
nd._add_to_queue('foo')
|
||||
self.assertEqual(nd.queue.get(), 'foo')
|
||||
|
||||
def _test_run_dispatch_helper(self, msg, handler):
|
||||
msgs = [msg]
|
||||
|
||||
def side_effect(*args):
|
||||
return msgs.pop(0)
|
||||
|
||||
with mock.patch('eventlet.Queue.get') as queue_get:
|
||||
queue_get.side_effect = side_effect
|
||||
nd = rpc.NotificationDispatcher()
|
||||
# catch the assertion so that the loop runs once
|
||||
self.assertRaises(IndexError, nd.run_dispatch, handler)
|
||||
|
||||
def test_run_dispatch_once(self):
|
||||
class SimpleHandler:
|
||||
def __init__(self):
|
||||
self.network_delete_end = mock.Mock()
|
||||
|
||||
msg = dict(event_type='network.delete.end',
|
||||
payload=dict(network_id='a'))
|
||||
|
||||
handler = SimpleHandler()
|
||||
self._test_run_dispatch_helper(msg, handler)
|
||||
handler.network_delete_end.called_once_with(msg['payload'])
|
||||
|
||||
def test_run_dispatch_missing_handler(self):
|
||||
class SimpleHandler:
|
||||
self.subnet_create_start = mock.Mock()
|
||||
|
||||
msg = dict(event_type='network.delete.end',
|
||||
payload=dict(network_id='a'))
|
||||
|
||||
handler = SimpleHandler()
|
||||
|
||||
with mock.patch('quantum.agent.rpc.LOG') as log:
|
||||
self._test_run_dispatch_helper(msg, handler)
|
||||
log.assert_has_calls([mock.call.debug(mock.ANY, mock.ANY)])
|
||||
|
||||
def test_run_dispatch_handler_raises(self):
|
||||
class SimpleHandler:
|
||||
def network_delete_end(self, payload):
|
||||
raise Exception('foo')
|
||||
|
||||
msg = dict(event_type='network.delete.end',
|
||||
payload=dict(network_id='a'))
|
||||
|
||||
handler = SimpleHandler()
|
||||
|
||||
with mock.patch('quantum.agent.rpc.LOG') as log:
|
||||
self._test_run_dispatch_helper(msg, handler)
|
||||
log.assert_has_calls([mock.call.warn(mock.ANY, mock.ANY)])
|
||||
|
@ -19,12 +19,15 @@ import socket
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
import unittest2 as unittest
|
||||
|
||||
from quantum.agent.common import config
|
||||
from quantum.agent import dhcp_agent
|
||||
from quantum.agent.dhcp_agent import DhcpAgentWithStateReport
|
||||
from quantum.agent.linux import dhcp
|
||||
from quantum.agent.linux import interface
|
||||
from quantum.common import constants
|
||||
from quantum.common import exceptions
|
||||
@ -33,6 +36,7 @@ from quantum.openstack.common import jsonutils
|
||||
|
||||
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
|
||||
ETCDIR = os.path.join(ROOTDIR, 'etc')
|
||||
HOSTNAME = 'hostname'
|
||||
|
||||
|
||||
def etcdir(*p):
|
||||
@ -114,34 +118,65 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
self.driver = mock.Mock(name='driver')
|
||||
self.driver_cls = self.driver_cls_p.start()
|
||||
self.driver_cls.return_value = self.driver
|
||||
self.notification_p = mock.patch(
|
||||
'quantum.agent.rpc.NotificationDispatcher')
|
||||
self.notification = self.notification_p.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.notification_p.stop()
|
||||
self.driver_cls_p.stop()
|
||||
cfg.CONF.reset()
|
||||
|
||||
def test_dhcp_agent_main(self):
|
||||
def test_dhcp_agent_manager(self):
|
||||
state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
|
||||
lease_relay_str = 'quantum.agent.dhcp_agent.DhcpLeaseRelay'
|
||||
with mock.patch.object(DhcpAgentWithStateReport,
|
||||
'sync_state',
|
||||
autospec=True) as mock_sync_state:
|
||||
with mock.patch.object(DhcpAgentWithStateReport,
|
||||
'periodic_resync',
|
||||
autospec=True) as mock_periodic_resync:
|
||||
with mock.patch(state_rpc_str) as state_rpc:
|
||||
with mock.patch(lease_relay_str) as mock_lease_relay:
|
||||
with mock.patch.object(sys, 'argv') as sys_argv:
|
||||
sys_argv.return_value = [
|
||||
'dhcp', '--config-file',
|
||||
etcdir('quantum.conf.test')]
|
||||
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
|
||||
config.register_agent_state_opts_helper(cfg.CONF)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
cfg.CONF.register_opts(
|
||||
dhcp_agent.DeviceManager.OPTS)
|
||||
cfg.CONF.register_opts(
|
||||
dhcp_agent.DhcpLeaseRelay.OPTS)
|
||||
cfg.CONF.register_opts(dhcp.OPTS)
|
||||
cfg.CONF.register_opts(interface.OPTS)
|
||||
cfg.CONF(project='quantum')
|
||||
agent_mgr = DhcpAgentWithStateReport('testhost')
|
||||
eventlet.greenthread.sleep(1)
|
||||
agent_mgr.after_start()
|
||||
mock_sync_state.assert_called_once_with(agent_mgr)
|
||||
mock_periodic_resync.assert_called_once_with(
|
||||
agent_mgr)
|
||||
state_rpc.assert_has_calls(
|
||||
[mock.call(mock.ANY),
|
||||
mock.call().report_state(mock.ANY, mock.ANY)])
|
||||
mock_lease_relay.assert_has_calls(
|
||||
[mock.call(mock.ANY),
|
||||
mock.call().start()])
|
||||
|
||||
def test_dhcp_agent_main_agent_manager(self):
|
||||
logging_str = 'quantum.agent.common.config.setup_logging'
|
||||
manager_str = 'quantum.agent.dhcp_agent.DeviceManager'
|
||||
agent_str = 'quantum.agent.dhcp_agent.DhcpAgent'
|
||||
launcher_str = 'quantum.openstack.common.service.ServiceLauncher'
|
||||
with mock.patch(logging_str):
|
||||
with mock.patch(manager_str) as dev_mgr:
|
||||
with mock.patch(agent_str) as dhcp:
|
||||
with mock.patch.object(sys, 'argv') as sys_argv:
|
||||
sys_argv.return_value = ['dhcp', '--config-file',
|
||||
etcdir('quantum.conf.test')]
|
||||
dhcp_agent.main()
|
||||
dev_mgr.assert_called_once(mock.ANY, 'sudo')
|
||||
dhcp.assert_has_calls([
|
||||
mock.call(mock.ANY),
|
||||
mock.call().run()])
|
||||
with mock.patch.object(sys, 'argv') as sys_argv:
|
||||
with mock.patch(launcher_str) as launcher:
|
||||
sys_argv.return_value = ['dhcp', '--config-file',
|
||||
etcdir('quantum.conf.test')]
|
||||
dhcp_agent.main()
|
||||
launcher.assert_has_calls(
|
||||
[mock.call(), mock.call().launch_service(mock.ANY),
|
||||
mock.call().wait()])
|
||||
|
||||
def test_run_completes_single_pass(self):
|
||||
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
attrs_to_mock = dict(
|
||||
[(a, mock.DEFAULT) for a in
|
||||
['sync_state', 'lease_relay', 'periodic_resync']])
|
||||
@ -151,19 +186,18 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
mocks['periodic_resync'].assert_called_once_with()
|
||||
mocks['lease_relay'].assert_has_mock_calls(
|
||||
[mock.call.start()])
|
||||
self.notification.assert_has_calls([mock.call.run_dispatch()])
|
||||
|
||||
def test_ns_name(self):
|
||||
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
|
||||
mock_net = mock.Mock(id='foo')
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
|
||||
|
||||
def test_ns_name_disabled_namespace(self):
|
||||
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
|
||||
cfg.CONF.set_override('use_namespaces', False)
|
||||
mock_net = mock.Mock(id='foo')
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
self.assertIsNone(dhcp._ns_name(mock_net))
|
||||
|
||||
def test_call_driver(self):
|
||||
@ -185,7 +219,7 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
self.driver.return_value.foo.side_effect = Exception
|
||||
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
|
||||
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
self.assertIsNone(dhcp.call_driver('foo', network))
|
||||
self.assertTrue(dev_mgr.called)
|
||||
self.driver.assert_called_once_with(cfg.CONF,
|
||||
@ -198,7 +232,7 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
|
||||
def test_update_lease(self):
|
||||
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
dhcp.update_lease('net_id', '192.168.1.1', 120)
|
||||
plug.assert_has_calls(
|
||||
[mock.call().update_lease_expiration(
|
||||
@ -209,7 +243,7 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
plug.return_value.update_lease_expiration.side_effect = Exception
|
||||
|
||||
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
dhcp.update_lease('net_id', '192.168.1.1', 120)
|
||||
plug.assert_has_calls(
|
||||
[mock.call().update_lease_expiration(
|
||||
@ -224,7 +258,7 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
mock_plugin.get_active_networks.return_value = active_networks
|
||||
plug.return_value = mock_plugin
|
||||
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
|
||||
attrs_to_mock = dict(
|
||||
[(a, mock.DEFAULT) for a in
|
||||
@ -260,21 +294,21 @@ class TestDhcpAgent(unittest.TestCase):
|
||||
plug.return_value = mock_plugin
|
||||
|
||||
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
dhcp.sync_state()
|
||||
|
||||
self.assertTrue(log.called)
|
||||
self.assertTrue(dhcp.needs_resync)
|
||||
|
||||
def test_periodic_resync(self):
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
|
||||
dhcp.periodic_resync()
|
||||
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
|
||||
|
||||
def test_periodoc_resync_helper(self):
|
||||
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
|
||||
dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
dhcp.needs_resync = True
|
||||
with mock.patch.object(dhcp, 'sync_state') as sync_state:
|
||||
sync_state.side_effect = RuntimeError
|
||||
@ -293,9 +327,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
'quantum.agent.linux.interface.NullDriver')
|
||||
config.register_root_helper(cfg.CONF)
|
||||
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
|
||||
self.notification_p = mock.patch(
|
||||
'quantum.agent.rpc.NotificationDispatcher')
|
||||
self.notification = self.notification_p.start()
|
||||
|
||||
self.plugin_p = mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi')
|
||||
plugin_cls = self.plugin_p.start()
|
||||
@ -307,7 +338,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.cache = mock.Mock()
|
||||
cache_cls.return_value = self.cache
|
||||
|
||||
self.dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
|
||||
self.dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
self.call_driver_p = mock.patch.object(self.dhcp, 'call_driver')
|
||||
|
||||
self.call_driver = self.call_driver_p.start()
|
||||
@ -321,7 +352,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.call_driver_p.stop()
|
||||
self.cache_p.stop()
|
||||
self.plugin_p.stop()
|
||||
self.notification_p.stop()
|
||||
|
||||
def test_enable_dhcp_helper(self):
|
||||
self.plugin.get_network_info.return_value = fake_network
|
||||
@ -462,26 +492,26 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
payload = dict(network=dict(id=fake_network.id))
|
||||
|
||||
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
|
||||
self.dhcp.network_create_end(payload)
|
||||
self.dhcp.network_create_end(None, payload)
|
||||
enable.assertCalledOnceWith(fake_network.id)
|
||||
|
||||
def test_network_update_end_admin_state_up(self):
|
||||
payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
|
||||
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
|
||||
self.dhcp.network_update_end(payload)
|
||||
self.dhcp.network_update_end(None, payload)
|
||||
enable.assertCalledOnceWith(fake_network.id)
|
||||
|
||||
def test_network_update_end_admin_state_down(self):
|
||||
payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
|
||||
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
|
||||
self.dhcp.network_update_end(payload)
|
||||
self.dhcp.network_update_end(None, payload)
|
||||
disable.assertCalledOnceWith(fake_network.id)
|
||||
|
||||
def test_network_delete_end(self):
|
||||
payload = dict(network_id=fake_network.id)
|
||||
|
||||
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
|
||||
self.dhcp.network_delete_end(payload)
|
||||
self.dhcp.network_delete_end(None, payload)
|
||||
disable.assertCalledOnceWith(fake_network.id)
|
||||
|
||||
def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
|
||||
@ -523,13 +553,13 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.cache.get_network_by_id.return_value = fake_network
|
||||
self.plugin.get_network_info.return_value = fake_network
|
||||
|
||||
self.dhcp.subnet_update_end(payload)
|
||||
self.dhcp.subnet_update_end(None, payload)
|
||||
|
||||
self.cache.assert_has_calls([mock.call.put(fake_network)])
|
||||
self.call_driver.assert_called_once_with('reload_allocations',
|
||||
fake_network)
|
||||
|
||||
def test_subnet_update_end(self):
|
||||
def test_subnet_update_end_restart(self):
|
||||
new_state = FakeModel(fake_network.id,
|
||||
tenant_id=fake_network.tenant_id,
|
||||
admin_state_up=True,
|
||||
@ -540,7 +570,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.cache.get_network_by_id.return_value = fake_network
|
||||
self.plugin.get_network_info.return_value = new_state
|
||||
|
||||
self.dhcp.subnet_update_end(payload)
|
||||
self.dhcp.subnet_update_end(None, payload)
|
||||
|
||||
self.cache.assert_has_calls([mock.call.put(new_state)])
|
||||
self.call_driver.assert_called_once_with('restart',
|
||||
@ -558,7 +588,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.cache.get_network_by_id.return_value = prev_state
|
||||
self.plugin.get_network_info.return_value = fake_network
|
||||
|
||||
self.dhcp.subnet_delete_end(payload)
|
||||
self.dhcp.subnet_delete_end(None, payload)
|
||||
|
||||
self.cache.assert_has_calls([
|
||||
mock.call.get_network_by_subnet_id(
|
||||
@ -571,7 +601,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
def test_port_update_end(self):
|
||||
payload = dict(port=vars(fake_port2))
|
||||
self.cache.get_network_by_id.return_value = fake_network
|
||||
self.dhcp.port_update_end(payload)
|
||||
self.dhcp.port_update_end(None, payload)
|
||||
self.cache.assert_has_calls(
|
||||
[mock.call.get_network_by_id(fake_port2.network_id),
|
||||
mock.call.put_port(mock.ANY)])
|
||||
@ -583,7 +613,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
self.cache.get_network_by_id.return_value = fake_network
|
||||
self.cache.get_port_by_id.return_value = fake_port2
|
||||
|
||||
self.dhcp.port_delete_end(payload)
|
||||
self.dhcp.port_delete_end(None, payload)
|
||||
|
||||
self.cache.assert_has_calls(
|
||||
[mock.call.get_port_by_id(fake_port2.id),
|
||||
@ -596,7 +626,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
|
||||
payload = dict(port_id='unknown')
|
||||
self.cache.get_port_by_id.return_value = None
|
||||
|
||||
self.dhcp.port_delete_end(payload)
|
||||
self.dhcp.port_delete_end(None, payload)
|
||||
|
||||
self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
|
||||
self.assertEqual(self.call_driver.call_count, 0)
|
||||
|
Loading…
Reference in New Issue
Block a user