Convert DHCP from polling to RPC

blueprint non-polling-dhcp-impl

This requires a change to devstack/stack.sh.
See https://review.openstack.org/#/c/11278/ for the required changes.

Change-Id: I1ea22c8e1b80e630bcb83f27a31aaeef482aff6c
This commit is contained in:
Mark McClain 2012-08-07 16:23:05 -04:00
parent 90928604d5
commit 96827953be
17 changed files with 1379 additions and 716 deletions

View File

@ -24,24 +24,4 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
# iproute2 package that supports namespaces).
use_namespaces = True
#
# Temporary F2 variables until the Agent <> Quantum Server is reworked in F3
#
# The database used by the OVS Quantum plugin
db_connection = mysql://root:password@localhost/ovs_quantum?charset=utf8
# The database used by the LinuxBridge Quantum plugin
#db_connection = mysql://root:password@localhost/quantum_linux_bridge
# The database used by the Ryu Quantum plugin
#db_connection = mysql://root:password@localhost/ryu_quantum
# The Quantum user information for accessing the Quantum API.
auth_url = http://localhost:35357/v2.0
auth_region = RegionOne
admin_tenant_name = service
admin_user = quantum
admin_password = password
# use_namespaces = True

View File

@ -124,6 +124,25 @@ control_exchange = quantum
# The "host" option should point or resolve to this address.
# rpc_zmq_bind_address = *
# ============ Notification System Options =====================
# Notifications can be sent when network/subnet/port are create, updated or deleted.
# There are four methods of sending notifications, logging (via the
# log_file directive), rpc (via a message queue),
# noop (no notifications sent, the default) or list of them
# Defined in notifier api
notification_driver = quantum.openstack.common.notifier.list_notifier
# default_notification_level = INFO
# myhost = myhost.com
# default_publisher_id = $myhost
# Defined in rabbit_notifier for rpc way
# notification_topics = notifications
# Defined in list_notifier
list_notifier_drivers = quantum.openstack.common.notifier.rabbit_notifier
[QUOTAS]
# resource name(s) that are supported in quota features
# quota_items = network,subnet,port
@ -142,22 +161,3 @@ control_exchange = quantum
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
# ============ Notification System Options =====================
# Notifications can be sent when network/subnet/port are create, updated or deleted.
# There are four methods of sending notifications, logging (via the
# log_file directive), rpc (via a message queue),
# noop (no notifications sent, the default) or list of them
# Defined in notifier api
# notification_driver = quantum.openstack.common.notifier.no_op_notifier
# default_notification_level = INFO
# myhost = myhost.com
# default_publisher_id = $myhost
# Defined in rabbit_notifier for rpc way
# notification_topics = notifications
# Defined in list_notifier
# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier

View File

@ -15,189 +15,291 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import socket
import sys
import time
import uuid
import eventlet
import netaddr
from sqlalchemy.ext import sqlsoup
from quantum.agent import rpc as agent_rpc
from quantum.agent.common import config
from quantum.agent.linux import dhcp
from quantum.agent.linux import interface
from quantum.agent.linux import ip_lib
from quantum.common import exceptions
from quantum.common import topics
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common import importutils
from quantum.openstack.common.rpc import proxy
from quantum.version import version_string
from quantumclient.v2_0 import client
LOG = logging.getLogger(__name__)
State = collections.namedtuple('State',
['networks', 'subnet_hashes', 'ipalloc_hashes'])
class DhcpAgent(object):
OPTS = [
cfg.StrOpt('db_connection', default=''),
cfg.StrOpt('root_helper', default='sudo'),
cfg.StrOpt('dhcp_driver',
default='quantum.agent.linux.dhcp.Dnsmasq',
help="The driver used to manage the DHCP server."),
cfg.IntOpt('polling_interval',
default=3,
help="The time in seconds between state poll requests."),
cfg.IntOpt('reconnect_interval',
default=5,
help="The time in seconds between db reconnect attempts."),
cfg.BoolOpt('use_namespaces', default=True,
help="Allow overlapping IP.")
]
def __init__(self, conf):
self.conf = conf
self.cache = NetworkCache()
self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
self.db = None
self.polling_interval = conf.polling_interval
self.reconnect_interval = conf.reconnect_interval
self._run = True
self.prev_state = State(set(), set(), set())
ctx = context.RequestContext('quantum', 'quantum', is_admin=True)
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
def daemon_loop(self):
while self._run:
delta = self.get_network_state_delta()
if delta is None:
continue
self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
self.notifications = agent_rpc.NotificationDispatcher()
for network in delta.get('new', []):
self.call_driver('enable', network)
for network in delta.get('updated', []):
self.call_driver('reload_allocations', network)
for network in delta.get('deleted', []):
self.call_driver('disable', network)
def run(self):
"""Activate the DHCP agent."""
# enable DHCP for current networks
for network_id in self.plugin_rpc.get_active_networks():
self.enable_dhcp_helper(network_id)
time.sleep(self.polling_interval)
self.notifications.run_dispatch(self)
def _state_builder(self):
"""Polls the Quantum database and returns a represenation
of the network state.
The value returned is a State tuple that contains three sets:
networks, subnet_hashes, and ipalloc_hashes.
The hash sets are a tuple that contains the computed signature of the
obejct's metadata and the network that owns it. Signatures are used
because the objects metadata can change. Python's built-in hash
function is used on the string repr to compute the metadata signature.
"""
try:
if self.db is None:
time.sleep(self.reconnect_interval)
self.db = sqlsoup.SqlSoup(self.conf.db_connection)
LOG.info("Connecting to database \"%s\" on %s" %
(self.db.engine.url.database,
self.db.engine.url.host))
else:
# we have to commit to get the latest view
self.db.commit()
subnets = {}
subnet_hashes = set()
network_admin_up = {}
for network in self.db.networks.all():
network_admin_up[network.id] = network.admin_state_up
for subnet in self.db.subnets.all():
if (not subnet.enable_dhcp or
not network_admin_up[subnet.network_id]):
continue
subnet_hashes.add((hash(str(subnet)), subnet.network_id))
subnets[subnet.id] = subnet.network_id
ipalloc_hashes = set([(hash(str(a)), subnets[a.subnet_id])
for a in self.db.ipallocations.all()
if a.subnet_id in subnets])
networks = set(subnets.itervalues())
return State(networks, subnet_hashes, ipalloc_hashes)
except Exception, e:
LOG.warn('Unable to get network state delta. Exception: %s' % e)
self.db = None
return None
def get_network_state_delta(self):
"""Return a dict containing the sets of networks that are new,
updated, and deleted."""
delta = {}
state = self._state_builder()
if state is None:
return None
# determine the new/deleted networks
delta['deleted'] = self.prev_state.networks - state.networks
delta['new'] = state.networks - self.prev_state.networks
# Get the networks that have subnets added or deleted.
# The change candidates are the net_id portion of the symmetric diff
# between the sets of (subnet_hash,net_id)
candidates = set(
[h[1] for h in
(state.subnet_hashes ^ self.prev_state.subnet_hashes)]
)
# Update with the networks that have had allocations added/deleted.
# change candidates are the net_id portion of the symmetric diff
# between the sets of (alloc_hash,net_id)
candidates.update(
[h[1] for h in
(state.ipalloc_hashes ^ self.prev_state.ipalloc_hashes)]
)
# the updated set will contain new and deleted networks, so remove them
delta['updated'] = candidates - delta['new'] - delta['deleted']
self.prev_state = state
return delta
def call_driver(self, action, network_id):
def call_driver(self, action, network):
"""Invoke an action on a DHCP driver instance."""
try:
# the Driver expects something that is duck typed similar to
# the base models. Augmenting will add support to the SqlSoup
# result, so that the Driver does have to concern itself with our
# db schema.
network = AugmentingWrapper(
self.db.networks.filter_by(id=network_id).one(),
self.db
)
# the base models.
driver = self.dhcp_driver_cls(self.conf,
network,
self.conf.root_helper,
DeviceManager(self.conf,
self.db,
'network:dhcp'))
self.device_manager)
getattr(driver, action)()
except Exception, e:
LOG.warn('Unable to %s dhcp. Exception: %s' % (action, e))
# Manipulate the state so the action will be attempted on next
# loop iteration.
if action == 'disable':
# adding to prev state means we'll try to delete it next time
self.prev_state.networks.add(network_id)
def enable_dhcp_helper(self, network_id):
"""Enable DHCP for a network that meets enabling criteria."""
network = self.plugin_rpc.get_network_info(network_id)
for subnet in network.subnets:
if subnet.enable_dhcp:
if network.admin_state_up:
self.call_driver('enable', network)
self.cache.put(network)
break
def disable_dhcp_helper(self, network_id):
"""Disable DHCP for a network known to the agent."""
network = self.cache.get_network_by_id(network_id)
if network:
self.call_driver('disable', network)
self.cache.remove(network)
def refresh_dhcp_helper(self, network_id):
"""Refresh or disable DHCP for a network depending on the current state
of the network.
"""
if not self.cache.get_network_by_id(network_id):
# DHCP current not running for network.
self.enable_dhcp_helper(network_id)
network = self.plugin_rpc.get_network_info(network_id)
for subnet in network.subnets:
if subnet.enable_dhcp:
self.cache.put(network)
self.call_driver('update_l3', network)
break
else:
# removing means it will look like new next time
self.prev_state.networks.remove(network_id)
self.disable_dhcp_helper(network.id)
def network_create_end(self, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)
def network_update_end(self, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
def network_delete_start(self, payload):
"""Handle the network.detete.start notification event."""
self.disable_dhcp_helper(payload['network_id'])
def subnet_delete_start(self, payload):
"""Handle the subnet.detete.start notification event."""
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if network:
device_id = self.device_manager.get_device_id(network)
self.plugin_rpc.release_port_fixed_ip(network.id, device_id,
subnet_id)
def subnet_update_end(self, payload):
"""Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
def subnet_delete_end(self, payload):
"""Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if network:
self.refresh_dhcp_helper(network.id)
def port_update_end(self, payload):
"""Handle the port.update.end notification event."""
port = DictModel(payload['port'])
network = self.cache.get_network_by_id(port.network_id)
if network:
self.cache.put_port(port)
self.call_driver('reload_allocations', network)
# Use the update handler for the port create event.
port_create_end = port_update_end
def port_delete_end(self, payload):
"""Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id'])
if port:
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
self.call_driver('reload_allocations', network)
class DhcpPluginApi(proxy.RpcProxy):
"""Agent side of the dhcp rpc API.
API version history:
1.0 - Initial version.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic, context):
super(DhcpPluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context
self.host = socket.gethostname()
def get_active_networks(self):
"""Make a remote process call to retrieve the active networks."""
return self.call(self.context,
self.make_msg('get_active_networks', host=self.host),
topic=self.topic)
def get_network_info(self, network_id):
"""Make a remote process call to retrieve network info."""
return DictModel(self.call(self.context,
self.make_msg('get_network_info',
network_id=network_id,
host=self.host),
topic=self.topic))
def get_dhcp_port(self, network_id, device_id):
"""Make a remote process call to create the dhcp port."""
return DictModel(self.call(self.context,
self.make_msg('get_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host),
topic=self.topic))
def release_dhcp_port(self, network_id, device_id):
"""Make a remote process call to release the dhcp port."""
return self.call(self.context,
self.make_msg('release_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host),
topic=self.topic)
def release_port_fixed_ip(self, network_id, device_id, subnet_id):
"""Make a remote process call to release a fixed_ip on the port."""
return self.call(self.context,
self.make_msg('release_port_fixed_ip',
network_id=network_id,
subnet_id=subnet_id,
device_id=device_id,
host=self.host),
topic=self.topic)
class NetworkCache(object):
"""Agent cache of the current network state."""
def __init__(self):
self.cache = {}
self.subnet_lookup = {}
self.port_lookup = {}
def get_network_by_id(self, network_id):
return self.cache.get(network_id)
def get_network_by_subnet_id(self, subnet_id):
return self.cache.get(self.subnet_lookup.get(subnet_id))
def get_network_by_port_id(self, port_id):
return self.cache.get(self.port_lookup.get(port_id))
def put(self, network):
if network.id in self.cache:
self.remove(self.cache[network.id])
self.cache[network.id] = network
for subnet in network.subnets:
self.subnet_lookup[subnet.id] = network.id
for port in network.ports:
self.port_lookup[port.id] = network.id
def remove(self, network):
del self.cache[network.id]
for subnet in network.subnets:
del self.subnet_lookup[subnet.id]
for port in network.ports:
del self.port_lookup[port.id]
def put_port(self, port):
network = self.get_network_by_id(port.network_id)
for index in range(len(network.ports)):
if network.ports[index].id == port.id:
network.ports[index] = port
break
else:
network.ports.append(port)
self.port_lookup[port.id] = network.id
def remove_port(self, port):
network = self.get_network_by_port_id(port.id)
for index in range(len(network.ports)):
if network.ports[index] == port:
del network.ports[index]
del self.port_lookup[port.id]
break
def get_port_by_id(self, port_id):
network = self.get_network_by_port_id(port_id)
if network:
for port in network.ports:
if port.id == port_id:
return port
class DeviceManager(object):
@ -212,20 +314,22 @@ class DeviceManager(object):
help="The driver used to manage the virtual interface.")
]
def __init__(self, conf, db, device_owner=''):
def __init__(self, conf, plugin):
self.conf = conf
self.db = db
self.device_owner = device_owner
self.plugin = plugin
if not conf.interface_driver:
LOG.error(_('You must specify an interface driver'))
self.driver = importutils.import_object(conf.interface_driver, conf)
def get_interface_name(self, network, port=None):
"""Return interface(device) name for use by the DHCP process."""
if not port:
port = self._get_or_create_port(network)
device_id = self.get_device_id(network)
port = self.plugin.get_dhcp_port(network.id, device_id)
return self.driver.get_device_name(port)
def get_device_id(self, network):
"""Return a unique DHCP device ID for this host on the network."""
# There could be more than one dhcp server per network, so create
# a device id that combines host and network ids
@ -233,7 +337,10 @@ class DeviceManager(object):
return 'dhcp%s-%s' % (host_uuid, network.id)
def setup(self, network, reuse_existing=False):
port = self._get_or_create_port(network)
"""Create and initialize a device for network's DHCP on this host."""
device_id = self.get_device_id(network)
port = self.plugin.get_dhcp_port(network.id, device_id)
interface_name = self.get_interface_name(network, port)
if self.conf.use_namespaces:
@ -266,128 +373,45 @@ class DeviceManager(object):
namespace=namespace)
def destroy(self, network):
self.driver.unplug(self.get_interface_name(network))
"""Destroy the device used for the network's DHCP on this host."""
if self.conf.use_namespaces:
namespace = network.id
else:
namespace = None
def _get_or_create_port(self, network):
# todo (mark): reimplement using RPC
# Usage of client lib is a temporary measure.
self.driver.unplug(self.get_interface_name(network),
namespace=namespace)
self.plugin.release_dhcp_port(network.id, self.get_device_id(network))
try:
device_id = self.get_device_id(network)
port_obj = self.db.ports.filter_by(device_id=device_id).one()
port = AugmentingWrapper(port_obj, self.db)
except sqlsoup.SQLAlchemyError, e:
port = self._create_port(network)
return port
def _create_port(self, network):
# todo (mark): reimplement using RPC
# Usage of client lib is a temporary measure.
quantum = client.Client(
username=self.conf.admin_user,
password=self.conf.admin_password,
tenant_name=self.conf.admin_tenant_name,
auth_url=self.conf.auth_url,
auth_strategy=self.conf.auth_strategy,
auth_region=self.conf.auth_region
)
body = dict(port=dict(
admin_state_up=True,
device_id=self.get_device_id(network),
device_owner=self.device_owner,
network_id=network.id,
tenant_id=network.tenant_id,
fixed_ips=[dict(subnet_id=s.id) for s in network.subnets]))
port_dict = quantum.create_port(body)['port']
# we have to call commit since the port was created in outside of
# our current transaction
self.db.commit()
port = AugmentingWrapper(
self.db.ports.filter_by(id=port_dict['id']).one(),
self.db)
return port
def update_l3(self, network):
"""Update the L3 attributes for the current network's DHCP device."""
self.setup(network, reuse_existing=True)
class PortModel(object):
def __init__(self, port_dict):
self.__dict__.update(port_dict)
class DictModel(object):
"""Convert dict into an object that provides attribute access to values."""
def __init__(self, d):
for key, value in d.iteritems():
if isinstance(value, list):
value = [DictModel(item) if isinstance(item, dict) else item
for item in value]
elif isinstance(value, dict):
value = DictModel(value)
class AugmentingWrapper(object):
"""A wrapper that augments Sqlsoup results so that they look like the
base v2 db model.
"""
MAPPING = {
'networks': {'subnets': 'subnets', 'ports': 'ports'},
'subnets': {'allocations': 'ipallocations'},
'ports': {'fixed_ips': 'ipallocations'},
}
def __init__(self, obj, db):
self.obj = obj
self.db = db
def __repr__(self):
return repr(self.obj)
def __getattr__(self, name):
"""Executes a dynamic lookup of attributes to make SqlSoup results
mimic the same structure as the v2 db models.
The actual models could not be used because they're dependent on the
plugin and the agent is not tied to any plugin structure.
If .subnet, is accessed, the wrapper will return a subnet
object if this instance has a subnet_id attribute.
If the _id attribute does not exists then wrapper will check MAPPING
to see if a reverse relationship exists. If so, a wrapped result set
will be returned.
"""
try:
return getattr(self.obj, name)
except:
pass
id_attr = '%s_id' % name
if hasattr(self.obj, id_attr):
args = {'id': getattr(self.obj, id_attr)}
return AugmentingWrapper(
getattr(self.db, '%ss' % name).filter_by(**args).one(),
self.db
)
try:
attr_name = self.MAPPING[self.obj._table.name][name]
arg_name = '%s_id' % self.obj._table.name[:-1]
args = {arg_name: self.obj.id}
return [AugmentingWrapper(o, self.db) for o in
getattr(self.db, attr_name).filter_by(**args).all()]
except KeyError:
pass
raise AttributeError
setattr(self, key, value)
def main():
conf = config.setup_conf()
conf.register_opts(DhcpAgent.OPTS)
conf.register_opts(DeviceManager.OPTS)
conf.register_opts(dhcp.OPTS)
conf.register_opts(interface.OPTS)
conf(sys.argv)
config.setup_logging(conf)
eventlet.monkey_patch()
cfg.CONF.register_opts(DhcpAgent.OPTS)
cfg.CONF.register_opts(DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp.OPTS)
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF(args=sys.argv, project='quantum')
config.setup_logging(cfg.CONF)
mgr = DhcpAgent(conf)
mgr.daemon_loop()
mgr = DhcpAgent(cfg.CONF)
mgr.run()
if __name__ == '__main__':

View File

@ -74,6 +74,10 @@ class DhcpBase(object):
def disable(self):
"""Disable dhcp for this network."""
@abc.abstractmethod
def update_l3(self, subnet, reason):
"""Alert the driver that a subnet has changed."""
def restart(self):
"""Restart the dhcp service for the network."""
self.disable()
@ -125,6 +129,11 @@ class DhcpLocalProcess(DhcpBase):
else:
LOG.debug(_('No DHCP started for %s') % self.network.id)
def update_l3(self):
"""Update the L3 settings for the interface and reload settings."""
self.device_delegate.update_l3(self.network)
self.reload_allocations()
def get_conf_file_name(self, kind, ensure_conf_dir=False):
"""Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.conf.dhcp_confs))

View File

@ -90,7 +90,7 @@ class LinuxInterfaceDriver(object):
"""Plug in the interface."""
@abc.abstractmethod
def unplug(self, device_name, bridge=None):
def unplug(self, device_name, bridge=None, namespace=None):
"""Unplug the interface."""
@ -99,7 +99,7 @@ class NullDriver(LinuxInterfaceDriver):
bridge=None, namespace=None):
pass
def unplug(self, device_name, bridge=None):
def unplug(self, device_name, bridge=None, namespace=None):
pass
@ -143,7 +143,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
namespace_obj.add_device_to_namespace(device)
device.link.set_up()
def unplug(self, device_name, bridge=None):
def unplug(self, device_name, bridge=None, namespace=None):
"""Unplug the interface."""
if not bridge:
bridge = self.conf.ovs_integration_bridge
@ -180,9 +180,9 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver):
else:
LOG.warn(_("Device %s already exists") % device_name)
def unplug(self, device_name, bridge=None):
def unplug(self, device_name, bridge=None, namespace=None):
"""Unplug the interface."""
device = ip_lib.IPDevice(device_name, self.conf.root_helper)
device = ip_lib.IPDevice(device_name, self.conf.root_helper, namespace)
try:
device.link.delete()
LOG.debug(_("Unplugged interface '%s'") % device_name)

View File

@ -191,8 +191,10 @@ class IpLinkCommand(IpDeviceCommandBase):
return self._parse_line(self._run('show', self.name, options='o'))
def _parse_line(self, value):
device_name, settings = value.replace("\\", '').split('>', 1)
if not value:
return {}
device_name, settings = value.replace("\\", '').split('>', 1)
tokens = settings.split()
keys = tokens[::2]
values = [int(v) if v.isdigit() else v for v in tokens[1::2]]
@ -286,4 +288,4 @@ def device_exists(device_name, root_helper=None, namespace=None):
address = IPDevice(device_name, root_helper, namespace).link.address
except RuntimeError:
return False
return True
return bool(address)

View File

@ -13,9 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import eventlet
from quantum.common import topics
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.openstack.common.notifier import api
from quantum.openstack.common.notifier import rabbit_notifier
LOG = logging.getLogger(__name__)
def create_consumers(dispatcher, prefix, topic_details):
@ -67,3 +77,32 @@ class PluginApi(proxy.RpcProxy):
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
topic=self.topic)
class NotificationDispatcher(object):
def __init__(self):
# Set the Queue size to 1 so that messages stay on server rather than
# being buffered in the process.
self.queue = eventlet.queue.Queue(1)
self.connection = rpc.create_connection(new=True)
topic = '%s.%s' % (rabbit_notifier.CONF.notification_topics[0],
api.CONF.default_notification_level.lower())
self.connection.declare_topic_consumer(topic=topic,
callback=self._add_to_queue)
self.connection.consume_in_thread()
def _add_to_queue(self, msg):
self.queue.put(msg)
def run_dispatch(self, handler):
while True:
msg = self.queue.get()
name = msg['event_type'].replace('.', '_')
try:
if hasattr(handler, name):
getattr(handler, name)(msg['payload'])
else:
LOG.debug('Unknown event_type: %s.' % msg['event_type'])
except Exception, e:
LOG.warn('Error processing message. Exception: %s' % e)

View File

@ -23,6 +23,7 @@ UPDATE = 'update'
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
def get_topic_name(prefix, table, operation):

173
quantum/db/dhcp_rpc_base.py Normal file
View File

@ -0,0 +1,173 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from sqlalchemy.orm import exc
from quantum import context as quantum_context
from quantum import manager
from quantum.api.v2 import attributes
from quantum.db import api as db
from quantum.openstack.common import context
LOG = logging.getLogger(__name__)
def augment_context(context):
"""Augments RPC with additional attributes, so that plugin calls work."""
return quantum_context.Context(context.user, None, is_admin=True,
roles=['admin'])
class DhcpRpcCallbackMixin(object):
"""A mix-in that enable DHCP agent support in plugin implementations."""
def get_active_networks(self, context, **kwargs):
"""Retrieve and return a list of the active network ids."""
host = kwargs.get('host')
LOG.debug('Network list requested from %s', host)
plugin = manager.QuantumManager.get_plugin()
context = augment_context(context)
filters = dict(admin_state_up=[True])
return [net['id'] for net in
plugin.get_networks(context, filters=filters)]
def get_network_info(self, context, **kwargs):
"""Retrieve and return a extended information about a network."""
network_id = kwargs.get('network_id')
context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
network = plugin.get_network(context, network_id)
filters = dict(network_id=[network_id])
network['subnets'] = plugin.get_subnets(context, filters=filters)
network['ports'] = plugin.get_ports(context, filters=filters)
return network
def get_dhcp_port(self, context, **kwargs):
"""Allocate a DHCP port for the host and return port information.
This method will re-use an existing port if one already exists. When a
port is re-used, the fixed_ip allocation will be updated to the current
network state.
"""
host = kwargs.get('host')
network_id = kwargs.get('network_id')
device_id = kwargs.get('device_id')
# There could be more than one dhcp server per network, so create
# a device id that combines host and network ids
LOG.debug('Port %s for %s requested from %s', device_id, network_id,
host)
context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
retval = None
filters = dict(network_id=[network_id])
subnets = dict([(s['id'], s) for s in
plugin.get_subnets(context, filters=filters)])
dhcp_enabled_subnet_ids = [s['id'] for s in
subnets.values() if s['enable_dhcp']]
try:
filters = dict(network_id=[network_id], device_id=[device_id])
ports = plugin.get_ports(context, filters=filters)
if len(ports):
# Ensure that fixed_ips cover all dhcp_enabled subnets.
port = ports[0]
for fixed_ip in port['fixed_ips']:
if fixed_ip['subnet_id'] in dhcp_enabled_subnet_ids:
dhcp_enabled_subnet_ids.remove(fixed_ip['subnet_id'])
port['fixed_ips'].extend(
[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
retval = plugin.update_port(context, port['id'],
dict(port=port))
except exc.NoResultFound:
pass
if retval is None:
# No previous port exists, so create a new one.
LOG.debug('DHCP port %s for %s created', device_id, network_id,
host)
network = plugin.get_network(context, network_id)
port_dict = dict(
admin_state_up=True,
device_id=device_id,
network_id=network_id,
tenant_id=network['tenant_id'],
mac_address=attributes.ATTR_NOT_SPECIFIED,
name='DHCP Agent',
device_owner='network:dhcp',
fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
retval = plugin.create_port(context, dict(port=port_dict))
# Convert subnet_id to subnet dict
for fixed_ip in retval['fixed_ips']:
subnet_id = fixed_ip.pop('subnet_id')
fixed_ip['subnet'] = subnets[subnet_id]
return retval
def release_dhcp_port(self, context, **kwargs):
"""Release the port currently being used by a DHCP agent."""
host = kwargs.get('host')
network_id = kwargs.get('network_id')
device_id = kwargs.get('device_id')
LOG.debug('DHCP port deletion for %s d request from %s', network_id,
host)
context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
filters = dict(network_id=[network_id], device_id=[device_id])
ports = plugin.get_ports(context, filters=filters)
if len(ports):
plugin.delete_port(context, ports[0]['id'])
def release_port_fixed_ip(self, context, **kwargs):
"""Release the fixed_ip associated the subnet on a port."""
host = kwargs.get('host')
network_id = kwargs.get('network_id')
device_id = kwargs.get('device_id')
subnet_id = kwargs.get('subnet_id')
LOG.debug('DHCP port remove fixed_ip for %s d request from %s',
subnet_id,
host)
context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
filters = dict(network_id=[network_id], device_id=[device_id])
ports = plugin.get_ports(context, filters=filters)
if len(ports):
port = ports[0]
fixed_ips = port.get('fixed_ips', [])
for i in range(len(fixed_ips)):
if fixed_ips[i]['subnet_id'] == subnet_id:
del fixed_ips[i]
break
plugin.update_port(context, port['id'], dict(port=port))

View File

@ -22,6 +22,7 @@ from quantum.common import exceptions as q_exc
from quantum.common import topics
from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import models_v2
from quantum.openstack.common import context
from quantum.openstack.common import cfg
@ -36,7 +37,7 @@ from quantum import policy
LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks():
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'

View File

@ -30,6 +30,7 @@ from quantum.common import topics
from quantum.common.utils import find_config_file
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import models_v2
from quantum.openstack.common import context
from quantum.openstack.common import cfg
@ -44,7 +45,7 @@ from quantum import policy
LOG = logging.getLogger(__name__)
class OVSRpcCallbacks():
class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'

View File

@ -0,0 +1,119 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from quantum.agent import rpc
from quantum.openstack.common import cfg
class AgentRPCMethods(unittest.TestCase):
def test_create_consumers(self):
dispatcher = mock.Mock()
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', dispatcher,
fanout=True),
mock.call().consume_in_thread()
]
call_to_patch = 'quantum.openstack.common.rpc.create_connection'
with mock.patch(call_to_patch) as create_connection:
conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
class AgentRPCNotificationDispatcher(unittest.TestCase):
def setUp(self):
self.create_connection_p = mock.patch(
'quantum.openstack.common.rpc.create_connection')
self.create_connection = self.create_connection_p.start()
cfg.CONF.set_override('default_notification_level', 'INFO')
cfg.CONF.set_override('notification_topics', ['notifications'])
def tearDown(self):
self.create_connection_p.stop()
cfg.CONF.reset()
def test_init(self):
nd = rpc.NotificationDispatcher()
expected = [
mock.call(new=True),
mock.call().declare_topic_consumer(topic='notifications.info',
callback=nd._add_to_queue),
mock.call().consume_in_thread()
]
self.create_connection.assert_has_calls(expected)
def test_add_to_queue(self):
nd = rpc.NotificationDispatcher()
nd._add_to_queue('foo')
self.assertEqual(nd.queue.get(), 'foo')
def _test_run_dispatch_helper(self, msg, handler):
msgs = [msg]
def side_effect(*args):
return msgs.pop(0)
with mock.patch('eventlet.Queue.get') as queue_get:
queue_get.side_effect = side_effect
nd = rpc.NotificationDispatcher()
# catch the assertion so that the loop runs once
self.assertRaises(IndexError, nd.run_dispatch, handler)
def test_run_dispatch_once(self):
class SimpleHandler:
def __init__(self):
self.network_delete_end = mock.Mock()
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
self._test_run_dispatch_helper(msg, handler)
handler.network_delete_end.called_once_with(msg['payload'])
def test_run_dispatch_missing_handler(self):
class SimpleHandler:
self.subnet_create_start = mock.Mock()
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
with mock.patch('quantum.agent.rpc.LOG') as log:
self._test_run_dispatch_helper(msg, handler)
log.assert_has_calls([mock.call.debug(mock.ANY)])
def test_run_dispatch_handler_raises(self):
class SimpleHandler:
def network_delete_end(self, payload):
raise Exception('foo')
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
with mock.patch('quantum.agent.rpc.LOG') as log:
self._test_run_dispatch_helper(msg, handler)
log.assert_has_calls([mock.call.warn(mock.ANY)])

View File

@ -31,6 +31,7 @@ from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config
from quantum import context
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
from quantum.tests.unit import test_extensions

View File

@ -0,0 +1,165 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
from quantum.db import dhcp_rpc_base
class TestDhcpAugmentContext(unittest.TestCase):
def test_augment_context(self):
context = mock.Mock()
context.user = 'quantum'
context.tenant = None
context.is_admin = True
new_context = dhcp_rpc_base.augment_context(context)
self.assertEqual(new_context.user_id, context.user)
self.assertEqual(new_context.roles, ['admin'])
class TestDhcpRpcCallackMixin(unittest.TestCase):
def setUp(self):
self.context_p = mock.patch('quantum.db.dhcp_rpc_base.augment_context')
self.context_p.start()
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start()
self.plugin = mock.Mock()
get_plugin.return_value = self.plugin
self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin()
self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG')
self.log = self.log_p.start()
def tearDown(self):
self.log_p.stop()
self.plugin_p.stop()
self.context_p.stop()
def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')]
self.plugin.get_networks.return_value = plugin_retval
networks = self.callbacks.get_active_networks(mock.Mock(), host='host')
self.assertEqual(networks, ['a', 'b'])
self.plugin.assert_has_calls(
[mock.call.get_networks(mock.ANY,
filters=dict(admin_state_up=[True]))])
self.assertEqual(len(self.log.mock_calls), 1)
def test_get_network_info(self):
network_retval = dict(id='a')
subnet_retval = mock.Mock()
port_retval = mock.Mock()
self.plugin.get_network.return_value = network_retval
self.plugin.get_subnets.return_value = subnet_retval
self.plugin.get_ports.return_value = port_retval
retval = self.callbacks.get_network_info(mock.Mock(), network_id='a')
self.assertEquals(retval, network_retval)
self.assertEqual(retval['subnets'], subnet_retval)
self.assertEqual(retval['ports'], port_retval)
def _test_get_dhcp_port_helper(self, port_retval, other_expectations=[],
update_port=None, create_port=None):
subnets_retval = [dict(id='a', enable_dhcp=True),
dict(id='b', enable_dhcp=False)]
self.plugin.get_subnets.return_value = subnets_retval
if port_retval:
self.plugin.get_ports.return_value = [port_retval]
else:
self.plugin.get_ports.return_value = []
self.plugin.update_port.return_value = update_port
self.plugin.create_port.return_value = create_port
retval = self.callbacks.get_dhcp_port(mock.Mock(),
network_id='netid',
device_id='devid',
host='host')
expected = [mock.call.get_subnets(mock.ANY,
filters=dict(network_id=['netid'])),
mock.call.get_ports(mock.ANY,
filters=dict(network_id=['netid'],
device_id=['devid']))]
expected.extend(other_expectations)
self.plugin.assert_has_calls(expected)
return retval
def test_get_dhcp_port_existing(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
expectations = [
mock.call.update_port(mock.ANY, 'port_id', dict(port=port_retval))]
retval = self._test_get_dhcp_port_helper(port_retval, expectations,
update_port=port_retval)
self.assertEqual(len(self.log.mock_calls), 1)
def test_get_dhcp_port_create_new(self):
self.plugin.get_network.return_value = dict(tenant_id='tenantid')
create_spec = dict(tenant_id='tenantid', device_id='devid',
network_id='netid', name='DHCP Agent',
admin_state_up=True,
device_owner='network:dhcp',
mac_address=mock.ANY)
create_retval = create_spec.copy()
create_retval['id'] = 'port_id'
create_retval['fixed_ips'] = [dict(subnet_id='a', enable_dhcp=True)]
create_spec['fixed_ips'] = [dict(subnet_id='a')]
expectations = [
mock.call.get_network(mock.ANY, 'netid'),
mock.call.create_port(mock.ANY, dict(port=create_spec))]
retval = self._test_get_dhcp_port_helper(None, expectations,
create_port=create_retval)
self.assertEqual(create_retval, retval)
self.assertEqual(len(self.log.mock_calls), 2)
def test_release_dhcp_port(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
self.plugin.get_ports.return_value = [port_retval]
self.callbacks.release_dhcp_port(mock.ANY, network_id='netid',
device_id='devid')
self.plugin.assert_has_calls([
mock.call.get_ports(mock.ANY, filters=dict(network_id=['netid'],
device_id=['devid'])),
mock.call.delete_port(mock.ANY, 'port_id')])
def test_release_port_fixed_ip(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
port_update = dict(id='port_id', fixed_ips=[])
self.plugin.get_ports.return_value = [port_retval]
self.callbacks.release_port_fixed_ip(mock.ANY, network_id='netid',
device_id='devid', subnet_id='a')
self.plugin.assert_has_calls([
mock.call.get_ports(mock.ANY, filters=dict(network_id=['netid'],
device_id=['devid'])),
mock.call.update_port(mock.ANY, 'port_id',
dict(port=port_update))])

File diff suppressed because it is too large Load Diff

View File

@ -133,6 +133,9 @@ class TestDhcpBase(unittest.TestCase):
def disable(self):
self.called.append('disable')
def update_l3(self):
pass
def reload_allocations(self):
pass
@ -285,6 +288,20 @@ class TestDhcpLocalProcess(TestBase):
'cccccccc-cccc-cccc-cccc-cccccccccccc', 'kill', '-9', 5]
self.execute.assert_called_once_with(exp_args, root_helper='sudo')
def test_update_l3(self):
delegate = mock.Mock()
fake_net = FakeDualNetwork()
with mock.patch.object(LocalChild, 'active') as active:
active.__get__ = mock.Mock(return_value=False)
lp = LocalChild(self.conf,
fake_net,
device_delegate=delegate)
lp.update_l3()
delegate.assert_has_calls(
[mock.call.update_l3(fake_net)])
self.assertEqual(lp.called, ['reload'])
def test_pid(self):
with mock.patch('__builtin__.open') as mock_open:
mock_open.return_value.__enter__ = lambda s: s

View File

@ -54,6 +54,7 @@ class FakePort:
fixed_ips = [FakeAllocation]
device_id = 'cccccccc-cccc-cccc-cccc-cccccccccccc'
network = FakeNetwork()
network_id = network.id
class TestBase(unittest.TestCase):
@ -252,7 +253,7 @@ class TestBridgeInterfaceDriver(TestBase):
br.unplug('tap0')
log.assert_called_once()
self.ip_dev.assert_has_calls([mock.call('tap0', 'sudo'),
self.ip_dev.assert_has_calls([mock.call('tap0', 'sudo', None),
mock.call().link.delete()])