Merge "Enables BigSwitch/Restproxy ML2 VLAN driver"

This commit is contained in:
Jenkins 2014-02-11 06:51:31 +00:00 committed by Gerrit Code Review
commit afd0165421
7 changed files with 502 additions and 374 deletions

View File

@ -36,8 +36,13 @@ def put_port_hostid(context, port_id, host):
LOG.warning(_("No host_id in port request to track port location.")) LOG.warning(_("No host_id in port request to track port location."))
return return
if port_id == '': if port_id == '':
LOG.warning(_("Received an empty port ID for host '%s'"), host) LOG.warning(_("Received an empty port ID for host_id '%s'"), host)
return return
if host == '':
LOG.debug(_("Received an empty host_id for port '%s'"), port_id)
return
LOG.debug(_("Logging port %(port)s on host_id %(host)s"),
{'port': port_id, 'host': host})
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
location = portbindings_db.PortBindingPort(port_id=port_id, host=host) location = portbindings_db.PortBindingPort(port_id=port_id, host=host)
context.session.merge(location) context.session.merge(location)

View File

@ -91,7 +91,7 @@ restproxy_opts = [
"which performs the networking configuration. Note that " "which performs the networking configuration. Note that "
"only one server is needed per deployment, but you may " "only one server is needed per deployment, but you may "
"wish to deploy multiple servers to support failover.")), "wish to deploy multiple servers to support failover.")),
cfg.StrOpt('server_auth', default='username:password', secret=True, cfg.StrOpt('server_auth', default=None, secret=True,
help=_("The username and password for authenticating against " help=_("The username and password for authenticating against "
" the BigSwitch or Floodlight controller.")), " the BigSwitch or Floodlight controller.")),
cfg.BoolOpt('server_ssl', default=False, cfg.BoolOpt('server_ssl', default=False,
@ -202,12 +202,12 @@ class ServerProxy(object):
headers['Authorization'] = self.auth headers['Authorization'] = self.auth
LOG.debug(_("ServerProxy: server=%(server)s, port=%(port)d, " LOG.debug(_("ServerProxy: server=%(server)s, port=%(port)d, "
"ssl=%(ssl)r, action=%(action)s"), "ssl=%(ssl)r"),
{'server': self.server, 'port': self.port, 'ssl': self.ssl, {'server': self.server, 'port': self.port, 'ssl': self.ssl})
LOG.debug(_("ServerProxy: resource=%(resource)s, action=%(action)s, "
"data=%(data)r, headers=%(headers)r"),
{'resource': resource, 'data': data, 'headers': headers,
'action': action}) 'action': action})
LOG.debug(_("ServerProxy: resource=%(resource)s, data=%(data)r, "
"headers=%(headers)r"),
{'resource': resource, 'data': data, 'headers': headers})
conn = None conn = None
if self.ssl: if self.ssl:
@ -252,17 +252,33 @@ class ServerProxy(object):
class ServerPool(object): class ServerPool(object):
def __init__(self, servers, ssl, auth, neutron_id, timeout=10, def __init__(self, timeout=10,
base_uri='/quantum/v1.0', name='NeutronRestProxy'): base_uri=BASE_URI, name='NeutronRestProxy'):
LOG.debug(_("ServerPool: initializing"))
# 'servers' is the list of network controller REST end-points
# (used in order specified till one succeeds, and it is sticky
# till next failure). Use 'server_auth' to encode api-key
servers = cfg.CONF.RESTPROXY.servers
self.auth = cfg.CONF.RESTPROXY.server_auth
self.ssl = cfg.CONF.RESTPROXY.server_ssl
self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
self.base_uri = base_uri self.base_uri = base_uri
self.timeout = timeout
self.name = name self.name = name
self.auth = auth timeout = cfg.CONF.RESTPROXY.server_timeout
self.ssl = ssl if timeout is not None:
self.neutron_id = neutron_id self.timeout = timeout
self.servers = []
for server_port in servers: # validate config
self.servers.append(self.server_proxy_for(*server_port)) if not servers:
raise cfg.Error(_('Servers not defined. Aborting plugin'))
if any((len(spl) != 2) for spl in [sp.split(':', 1)
for sp in servers.split(',')]):
raise cfg.Error(_('Servers must be defined as <ip>:<port>'))
self.servers = [
self.server_proxy_for(server, int(port))
for server, port in (s.rsplit(':', 1) for s in servers.split(','))
]
LOG.debug(_("ServerPool: initialization done"))
def server_proxy_for(self, server, port): def server_proxy_for(self, server, port):
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id, return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
@ -379,39 +395,28 @@ class ServerPool(object):
errstr = _("Unable to update remote network: %s") errstr = _("Unable to update remote network: %s")
self.rest_action('DELETE', resource, errstr=errstr) self.rest_action('DELETE', resource, errstr=errstr)
def rest_create_port(self, net, port): def rest_create_port(self, tenant_id, net_id, port):
resource = PORT_RESOURCE_PATH % (net["tenant_id"], net["id"]) resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
data = {"port": port} data = {"port": port}
device_id = port.get("device_id")
if not port["mac_address"] or not device_id:
# controller only cares about ports attached to devices
LOG.warning(_("No device attached to port %s. "
"Skipping notification to controller."), port["id"])
return
data["attachment"] = {"id": device_id,
"mac": port["mac_address"]}
errstr = _("Unable to create remote port: %s") errstr = _("Unable to create remote port: %s")
self.rest_action('POST', resource, data, errstr)
def rest_update_port(self, tenant_id, network_id, port, port_id):
resource = PORTS_PATH % (tenant_id, network_id, port_id)
data = {"port": port}
errstr = _("Unable to update remote port: %s")
self.rest_action('PUT', resource, data, errstr) self.rest_action('PUT', resource, data, errstr)
def rest_delete_port(self, tenant_id, network_id, port_id): def rest_delete_port(self, tenant_id, network_id, port_id):
resource = PORTS_PATH % (tenant_id, network_id, port_id) resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
errstr = _("Unable to delete remote port: %s") errstr = _("Unable to delete remote port: %s")
self.rest_action('DELETE', resource, errstr=errstr) self.rest_action('DELETE', resource, errstr=errstr)
def rest_plug_interface(self, tenant_id, net_id, port, def rest_update_port(self, tenant_id, net_id, port):
remote_interface_id): # Controller has no update operation for the port endpoint
if port["mac_address"] is not None: self.rest_create_port(tenant_id, net_id, port)
resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
data = {"attachment":
{"id": remote_interface_id,
"mac": port["mac_address"],
}
}
errstr = _("Unable to plug in interface: %s")
self.rest_action('PUT', resource, data, errstr)
def rest_unplug_interface(self, tenant_id, net_id, port_id):
resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
errstr = _("Unable to unplug interface: %s")
self.rest_action('DELETE', resource, errstr=errstr)
class RpcProxy(dhcp_rpc_base.DhcpRpcCallbackMixin): class RpcProxy(dhcp_rpc_base.DhcpRpcCallbackMixin):
@ -423,9 +428,227 @@ class RpcProxy(dhcp_rpc_base.DhcpRpcCallbackMixin):
agents_db.AgentExtRpcCallback()]) agents_db.AgentExtRpcCallback()])
class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2, class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
routerrule_db.RouterRule_db_mixin, routerrule_db.RouterRule_db_mixin):
supported_extension_aliases = ["binding"]
servers = None
def __init__(self, server_timeout=None):
# This base class is not intended to be instantiated directly.
# Extending class should set ServerPool.
if not self.servers:
LOG.warning(_("ServerPool not set!"))
def _send_all_data(self, send_ports=True, send_floating_ips=True,
send_routers=True):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
This gives the controller an option to re-sync it's persistent store
with neutron's current view of that data.
"""
admin_context = qcontext.get_admin_context()
networks = []
all_networks = self.get_networks(admin_context) or []
for net in all_networks:
mapped_network = self._get_mapped_network_with_subnets(net)
flips_n_ports = {}
if send_floating_ips:
flips_n_ports = self._get_network_with_floatingips(
mapped_network)
if send_ports:
ports = []
net_filter = {'network_id': [net.get('id')]}
net_ports = self.get_ports(admin_context,
filters=net_filter) or []
for port in net_ports:
mapped_port = self._map_state_and_status(port)
mapped_port['attachment'] = {
'id': port.get('device_id'),
'mac': port.get('mac_address'),
}
mapped_port = self._extend_port_dict_binding(admin_context,
mapped_port)
ports.append(mapped_port)
flips_n_ports['ports'] = ports
if flips_n_ports:
networks.append(flips_n_ports)
resource = '/topology'
data = {
'networks': networks,
}
if send_routers:
routers = []
all_routers = self.get_routers(admin_context) or []
for router in all_routers:
interfaces = []
mapped_router = self._map_state_and_status(router)
router_filter = {
'device_owner': ["network:router_interface"],
'device_id': [router.get('id')]
}
router_ports = self.get_ports(admin_context,
filters=router_filter) or []
for port in router_ports:
net_id = port.get('network_id')
subnet_id = port['fixed_ips'][0]['subnet_id']
intf_details = self._get_router_intf_details(admin_context,
net_id,
subnet_id)
interfaces.append(intf_details)
mapped_router['interfaces'] = interfaces
routers.append(mapped_router)
data.update({'routers': routers})
errstr = _("Unable to update remote topology: %s")
return self.servers.rest_action('PUT', resource, data, errstr)
def _get_network_with_floatingips(self, network, context=None):
if context is None:
context = qcontext.get_admin_context()
net_id = network['id']
net_filter = {'floating_network_id': [net_id]}
fl_ips = self.get_floatingips(context,
filters=net_filter) or []
network['floatingips'] = fl_ips
return network
def _get_all_subnets_json_for_network(self, net_id, context=None):
if context is None:
context = qcontext.get_admin_context()
# start a sub-transaction to avoid breaking parent transactions
with context.session.begin(subtransactions=True):
subnets = self._get_subnets_by_network(context,
net_id)
subnets_details = []
if subnets:
for subnet in subnets:
subnet_dict = self._make_subnet_dict(subnet)
mapped_subnet = self._map_state_and_status(subnet_dict)
subnets_details.append(mapped_subnet)
return subnets_details
def _get_mapped_network_with_subnets(self, network, context=None):
# if context is not provided, admin context is used
if context is None:
context = qcontext.get_admin_context()
network = self._map_state_and_status(network)
subnets = self._get_all_subnets_json_for_network(network['id'],
context)
network['subnets'] = subnets
for subnet in (subnets or []):
if subnet['gateway_ip']:
# FIX: For backward compatibility with wire protocol
network['gateway'] = subnet['gateway_ip']
break
else:
network['gateway'] = ''
network[external_net.EXTERNAL] = self._network_is_external(
context, network['id'])
# include ML2 segmentation types
network['segmentation_types'] = getattr(self, "segmentation_types", "")
return network
def _send_create_network(self, network, context=None):
tenant_id = network['tenant_id']
mapped_network = self._get_mapped_network_with_subnets(network,
context)
self.servers.rest_create_network(tenant_id, mapped_network)
def _send_update_network(self, network, context=None):
net_id = network['id']
tenant_id = network['tenant_id']
mapped_network = self._get_mapped_network_with_subnets(network,
context)
net_fl_ips = self._get_network_with_floatingips(mapped_network,
context)
self.servers.rest_update_network(tenant_id, net_id, net_fl_ips)
def _send_delete_network(self, network, context=None):
net_id = network['id']
tenant_id = network['tenant_id']
self.servers.rest_delete_network(tenant_id, net_id)
def _map_state_and_status(self, resource):
resource = copy.copy(resource)
resource['state'] = ('UP' if resource.pop('admin_state_up',
True) else 'DOWN')
if 'status' in resource:
del resource['status']
return resource
def _warn_on_state_status(self, resource):
if resource.get('admin_state_up', True) is False:
LOG.warning(_("Setting admin_state_up=False is not supported "
"in this plugin version. Ignoring setting for "
"resource: %s"), resource)
if 'status' in resource:
if resource['status'] is not const.NET_STATUS_ACTIVE:
LOG.warning(_("Operational status is internally set by the "
"plugin. Ignoring setting status=%s."),
resource['status'])
def _get_router_intf_details(self, context, intf_id, subnet_id):
# we will use the network id as interface's id
net_id = intf_id
network = self.get_network(context, net_id)
subnet = self.get_subnet(context, subnet_id)
mapped_network = self._get_mapped_network_with_subnets(network)
mapped_subnet = self._map_state_and_status(subnet)
data = {
'id': intf_id,
"network": mapped_network,
"subnet": mapped_subnet
}
return data
def _extend_port_dict_binding(self, context, port):
cfg_vif_type = cfg.CONF.NOVA.vif_type.lower()
if not cfg_vif_type in (portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_IVS):
LOG.warning(_("Unrecognized vif_type in configuration "
"[%s]. Defaulting to ovs."),
cfg_vif_type)
cfg_vif_type = portbindings.VIF_TYPE_OVS
hostid = porttracker_db.get_port_hostid(context, port['id'])
if hostid:
port[portbindings.HOST_ID] = hostid
override = self._check_hostvif_override(hostid)
if override:
cfg_vif_type = override
port[portbindings.VIF_TYPE] = cfg_vif_type
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
return port
def _check_hostvif_override(self, hostid):
for v in cfg.CONF.NOVA.vif_types:
if hostid in getattr(cfg.CONF.NOVA, "node_override_vif_" + v, []):
return v
return False
class NeutronRestProxyV2(NeutronRestProxyV2Base,
extradhcpopt_db.ExtraDhcpOptMixin, extradhcpopt_db.ExtraDhcpOptMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin): agentschedulers_db.DhcpAgentSchedulerDbMixin):
@ -443,28 +666,10 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# Include the BigSwitch Extensions path in the api_extensions # Include the BigSwitch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__) neutron_extensions.append_api_extensions_path(extensions.__path__)
# 'servers' is the list of network controller REST end-points
# (used in order specified till one succeeds, and it is sticky
# till next failure). Use 'server_auth' to encode api-key
servers = cfg.CONF.RESTPROXY.servers
server_auth = cfg.CONF.RESTPROXY.server_auth
server_ssl = cfg.CONF.RESTPROXY.server_ssl
sync_data = cfg.CONF.RESTPROXY.sync_data
neutron_id = cfg.CONF.RESTPROXY.neutron_id
self.add_meta_server_route = cfg.CONF.RESTPROXY.add_meta_server_route self.add_meta_server_route = cfg.CONF.RESTPROXY.add_meta_server_route
timeout = cfg.CONF.RESTPROXY.server_timeout
if server_timeout is not None:
timeout = server_timeout
# validate config
assert servers is not None, _('Servers not defined. Aborting plugin')
servers = tuple(s.rsplit(':', 1) for s in servers.split(','))
servers = tuple((server, int(port)) for server, port in servers)
assert all(len(s) == 2 for s in servers), SYNTAX_ERROR_MESSAGE
# init network ctrl connections # init network ctrl connections
self.servers = ServerPool(servers, server_ssl, server_auth, neutron_id, self.servers = ServerPool(server_timeout, BASE_URI)
timeout, BASE_URI)
# init dhcp support # init dhcp support
self.topic = topics.PLUGIN self.topic = topics.PLUGIN
@ -482,7 +687,7 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
fanout=False) fanout=False)
# Consume from all consumers in a thread # Consume from all consumers in a thread
self.conn.consume_in_thread() self.conn.consume_in_thread()
if sync_data: if cfg.CONF.RESTPROXY.sync_data:
self._send_all_data() self._send_all_data()
LOG.debug(_("NeutronRestProxyV2: initialization done")) LOG.debug(_("NeutronRestProxyV2: initialization done"))
@ -516,19 +721,12 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
self._warn_on_state_status(network['network']) self._warn_on_state_status(network['network'])
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
# Validate args
tenant_id = self._get_tenant_id_for_create(context,
network["network"])
# create network in DB # create network in DB
new_net = super(NeutronRestProxyV2, self).create_network(context, new_net = super(NeutronRestProxyV2, self).create_network(context,
network) network)
self._process_l3_create(context, new_net, network['network']) self._process_l3_create(context, new_net, network['network'])
mapped_network = self._get_mapped_network_with_subnets(new_net,
context)
# create network on the network controller # create network on the network controller
self.servers.rest_create_network(tenant_id, mapped_network) self._send_create_network(new_net, context)
# return created network # return created network
return new_net return new_net
@ -585,7 +783,6 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# Validate args # Validate args
orig_net = super(NeutronRestProxyV2, self).get_network(context, net_id) orig_net = super(NeutronRestProxyV2, self).get_network(context, net_id)
tenant_id = orig_net["tenant_id"]
filter = {'network_id': [net_id]} filter = {'network_id': [net_id]}
ports = self.get_ports(context, filters=filter) ports = self.get_ports(context, filters=filter)
@ -600,7 +797,7 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
ret_val = super(NeutronRestProxyV2, self).delete_network(context, ret_val = super(NeutronRestProxyV2, self).delete_network(context,
net_id) net_id)
self.servers.rest_delete_network(tenant_id, net_id) self._send_delete_network(orig_net, context)
return ret_val return ret_val
def create_port(self, context, port): def create_port(self, context, port):
@ -633,7 +830,6 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# Update DB in new session so exceptions rollback changes # Update DB in new session so exceptions rollback changes
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
port["port"]["admin_state_up"] = False
dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, []) dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, [])
new_port = super(NeutronRestProxyV2, self).create_port(context, new_port = super(NeutronRestProxyV2, self).create_port(context,
port) port)
@ -647,7 +843,6 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
new_port = self._extend_port_dict_binding(context, new_port) new_port = self._extend_port_dict_binding(context, new_port)
net = super(NeutronRestProxyV2, net = super(NeutronRestProxyV2,
self).get_network(context, new_port["network_id"]) self).get_network(context, new_port["network_id"])
if self.add_meta_server_route: if self.add_meta_server_route:
if new_port['device_owner'] == 'network:dhcp': if new_port['device_owner'] == 'network:dhcp':
destination = METADATA_SERVER_IP + '/32' destination = METADATA_SERVER_IP + '/32'
@ -655,28 +850,10 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# create on network ctrl # create on network ctrl
mapped_port = self._map_state_and_status(new_port) mapped_port = self._map_state_and_status(new_port)
self.servers.rest_create_port(net, mapped_port) self.servers.rest_create_port(net["tenant_id"],
new_port["network_id"],
# connect device to network, if present mapped_port)
device_id = port["port"].get("device_id") return new_port
if device_id:
try:
self.servers.rest_plug_interface(net["tenant_id"], net["id"],
new_port, device_id)
except RemoteRestError:
with excutils.save_and_reraise_exception():
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(
context,
new_port["id"],
port_update
)
# Set port state up and return that port
port_update = {"port": {"admin_state_up": True}}
new_port = super(NeutronRestProxyV2, self).update_port(context,
new_port["id"],
port_update)
return self._extend_port_dict_binding(context, new_port)
def get_port(self, context, id, fields=None): def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
@ -732,39 +909,27 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
self).update_port(context, port_id, port) self).update_port(context, port_id, port)
self._update_extra_dhcp_opts_on_port(context, port_id, port, self._update_extra_dhcp_opts_on_port(context, port_id, port,
new_port) new_port)
ctrl_update_required = False
old_host_id = porttracker_db.get_port_hostid(context,
orig_port['id'])
if (portbindings.HOST_ID in port['port'] if (portbindings.HOST_ID in port['port']
and 'id' in new_port): and 'id' in new_port):
host_id = port['port'][portbindings.HOST_ID] host_id = port['port'][portbindings.HOST_ID]
porttracker_db.put_port_hostid(context, new_port['id'], porttracker_db.put_port_hostid(context, new_port['id'],
host_id) host_id)
new_port = self._extend_port_dict_binding(context, new_port) if old_host_id != host_id:
ctrl_update_required = True
# update on networl ctrl if (new_port.get("device_id") != orig_port.get("device_id") and
mapped_port = self._map_state_and_status(new_port) orig_port.get("device_id")):
self.servers.rest_update_port(orig_port["tenant_id"], ctrl_update_required = True
orig_port["network_id"],
mapped_port, port_id)
if (new_port.get("device_id") != orig_port.get("device_id") and if ctrl_update_required:
orig_port.get("device_id")): new_port = self._extend_port_dict_binding(context, new_port)
try: mapped_port = self._map_state_and_status(new_port)
self.servers.rest_unplug_interface(orig_port["tenant_id"], self.servers.rest_update_port(new_port["tenant_id"],
orig_port["network_id"], new_port["network_id"],
orig_port["id"]) mapped_port)
device_id = new_port.get("device_id")
if device_id:
self.rest_plug_interface(new_port["tenant_id"],
new_port["network_id"],
new_port, device_id)
except RemoteRestError:
with excutils.save_and_reraise_exception():
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(
context,
new_port["id"],
port_update
)
# return new_port # return new_port
return new_port return new_port
@ -788,28 +953,8 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
self.prevent_l3_port_deletion(context, port_id) self.prevent_l3_port_deletion(context, port_id)
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
self.disassociate_floatingips(context, port_id) self.disassociate_floatingips(context, port_id)
self._unplug_port(context, port_id)
# Separate transaction for delete in case unplug passes
# but delete fails on controller
with context.session.begin(subtransactions=True):
super(NeutronRestProxyV2, self).delete_port(context, port_id) super(NeutronRestProxyV2, self).delete_port(context, port_id)
def _unplug_port(self, context, port_id):
port = super(NeutronRestProxyV2, self).get_port(context, port_id)
tenant_id = port['tenant_id']
net_id = port['network_id']
if tenant_id == '':
net = super(NeutronRestProxyV2, self).get_network(context, net_id)
tenant_id = net['tenant_id']
if port.get("device_id"):
self.servers.rest_unplug_interface(tenant_id, net_id, port_id)
# Port should transition to error state now that it's unplugged
# but not yet deleted
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(context,
port_id,
port_update)
def _delete_port(self, context, port_id): def _delete_port(self, context, port_id):
port = super(NeutronRestProxyV2, self).get_port(context, port_id) port = super(NeutronRestProxyV2, self).get_port(context, port_id)
tenant_id = port['tenant_id'] tenant_id = port['tenant_id']
@ -1086,69 +1231,6 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# networks are detected, which isn't supported by the Plugin # networks are detected, which isn't supported by the Plugin
LOG.error(_("NeutronRestProxyV2: too many external networks")) LOG.error(_("NeutronRestProxyV2: too many external networks"))
def _send_all_data(self):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
This gives the controller an option to re-sync it's persistent store
with neutron's current view of that data.
"""
admin_context = qcontext.get_admin_context()
networks = []
routers = []
all_networks = super(NeutronRestProxyV2,
self).get_networks(admin_context) or []
for net in all_networks:
mapped_network = self._get_mapped_network_with_subnets(net)
net_fl_ips = self._get_network_with_floatingips(mapped_network)
ports = []
net_filter = {'network_id': [net.get('id')]}
net_ports = super(NeutronRestProxyV2,
self).get_ports(admin_context,
filters=net_filter) or []
for port in net_ports:
mapped_port = self._map_state_and_status(port)
mapped_port['attachment'] = {
'id': port.get('device_id'),
'mac': port.get('mac_address'),
}
ports.append(mapped_port)
net_fl_ips['ports'] = ports
networks.append(net_fl_ips)
all_routers = super(NeutronRestProxyV2,
self).get_routers(admin_context) or []
for router in all_routers:
interfaces = []
mapped_router = self._map_state_and_status(router)
router_filter = {
'device_owner': ["network:router_interface"],
'device_id': [router.get('id')]
}
router_ports = super(NeutronRestProxyV2,
self).get_ports(admin_context,
filters=router_filter) or []
for port in router_ports:
net_id = port.get('network_id')
subnet_id = port['fixed_ips'][0]['subnet_id']
intf_details = self._get_router_intf_details(admin_context,
net_id,
subnet_id)
interfaces.append(intf_details)
mapped_router['interfaces'] = interfaces
routers.append(mapped_router)
resource = '/topology'
data = {
'networks': networks,
'routers': routers,
}
errstr = _("Unable to update remote topology: %s")
return self.servers.rest_action('PUT', resource, data, errstr)
def _add_host_route(self, context, destination, port): def _add_host_route(self, context, destination, port):
subnet = {} subnet = {}
for fixed_ip in port['fixed_ips']: for fixed_ip in port['fixed_ips']:
@ -1165,131 +1247,3 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug(_("Adding host route: ")) LOG.debug(_("Adding host route: "))
LOG.debug(_("Destination:%(dst)s nexthop:%(next)s"), LOG.debug(_("Destination:%(dst)s nexthop:%(next)s"),
{'dst': destination, 'next': nexthop}) {'dst': destination, 'next': nexthop})
def _get_network_with_floatingips(self, network, context=None):
if context is None:
context = qcontext.get_admin_context()
net_id = network['id']
net_filter = {'floating_network_id': [net_id]}
fl_ips = super(NeutronRestProxyV2,
self).get_floatingips(context,
filters=net_filter) or []
network['floatingips'] = fl_ips
return network
def _get_all_subnets_json_for_network(self, net_id, context=None):
if context is None:
context = qcontext.get_admin_context()
# start a sub-transaction to avoid breaking parent transactions
with context.session.begin(subtransactions=True):
subnets = self._get_subnets_by_network(context,
net_id)
subnets_details = []
if subnets:
for subnet in subnets:
subnet_dict = self._make_subnet_dict(subnet)
mapped_subnet = self._map_state_and_status(subnet_dict)
subnets_details.append(mapped_subnet)
return subnets_details
def _get_mapped_network_with_subnets(self, network, context=None):
# if context is not provided, admin context is used
if context is None:
context = qcontext.get_admin_context()
network = self._map_state_and_status(network)
subnets = self._get_all_subnets_json_for_network(network['id'],
context)
network['subnets'] = subnets
for subnet in (subnets or []):
if subnet['gateway_ip']:
# FIX: For backward compatibility with wire protocol
network['gateway'] = subnet['gateway_ip']
break
else:
network['gateway'] = ''
network[external_net.EXTERNAL] = self._network_is_external(
context, network['id'])
return network
def _send_update_network(self, network, context):
net_id = network['id']
tenant_id = network['tenant_id']
# update network on network controller
mapped_network = self._get_mapped_network_with_subnets(network,
context)
net_fl_ips = self._get_network_with_floatingips(mapped_network,
context)
self.servers.rest_update_network(tenant_id, net_id, net_fl_ips)
def _map_state_and_status(self, resource):
resource = copy.copy(resource)
resource['state'] = ('UP' if resource.pop('admin_state_up',
True) else 'DOWN')
if 'status' in resource:
del resource['status']
return resource
def _warn_on_state_status(self, resource):
if resource.get('admin_state_up', True) is False:
LOG.warning(_("Setting admin_state_up=False is not supported"
" in this plugin version. Ignoring setting for "
"resource: %s"), resource)
if 'status' in resource:
if resource['status'] is not const.NET_STATUS_ACTIVE:
LOG.warning(_("Operational status is internally set by the"
" plugin. Ignoring setting status=%s."),
resource['status'])
def _get_router_intf_details(self, context, intf_id, subnet_id):
# we will use the network id as interface's id
net_id = intf_id
network = super(NeutronRestProxyV2, self).get_network(context,
net_id)
subnet = super(NeutronRestProxyV2, self).get_subnet(context,
subnet_id)
mapped_network = self._get_mapped_network_with_subnets(network)
mapped_subnet = self._map_state_and_status(subnet)
data = {
'id': intf_id,
"network": mapped_network,
"subnet": mapped_subnet
}
return data
def _extend_port_dict_binding(self, context, port):
cfg_vif_type = cfg.CONF.NOVA.vif_type.lower()
if not cfg_vif_type in (portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_IVS):
LOG.warning(_("Unrecognized vif_type in configuration "
"[%s]. Defaulting to ovs. "),
cfg_vif_type)
cfg_vif_type = portbindings.VIF_TYPE_OVS
hostid = porttracker_db.get_port_hostid(context,
port['id'])
if hostid:
override = self._check_hostvif_override(hostid)
if override:
cfg_vif_type = override
port[portbindings.VIF_TYPE] = cfg_vif_type
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
return port
def _check_hostvif_override(self, hostid):
for v in cfg.CONF.NOVA.vif_types:
if hostid in getattr(cfg.CONF.NOVA, "node_override_vif_" + v, []):
return v
return False

View File

@ -0,0 +1,104 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2014 Big Switch Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
from oslo.config import cfg
from neutron import context as ctx
from neutron.extensions import portbindings
from neutron.openstack.common import log
from neutron.plugins.bigswitch.db import porttracker_db
from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
from neutron.plugins.bigswitch.plugin import ServerPool
from neutron.plugins.ml2 import driver_api as api
LOG = log.getLogger(__name__)
class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
api.MechanismDriver):
"""Mechanism Driver for Big Switch Networks Controller.
This driver relays the network create, update, delete
operations to the Big Switch Controller.
"""
def initialize(self, server_timeout=None):
LOG.debug(_('Initializing driver'))
# backend doesn't support bulk operations yet
self.native_bulk_support = False
# init network ctrl connections
self.servers = ServerPool(server_timeout)
self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
LOG.debug(_("Initialization done"))
def create_network_postcommit(self, context):
# create network on the network controller
self._send_create_network(context.current)
def update_network_postcommit(self, context):
# update network on the network controller
self._send_update_network(context.current)
def delete_network_postcommit(self, context):
# delete network on the network controller
self._send_delete_network(context.current)
def create_port_postcommit(self, context):
# create port on the network controller
port = self._prepare_port_for_controller(context)
if port:
self.servers.rest_create_port(port["network"]["tenant_id"],
port["network"]["id"], port)
def update_port_postcommit(self, context):
# update port on the network controller
port = self._prepare_port_for_controller(context)
if port:
self.servers.rest_update_port(port["network"]["tenant_id"],
port["network"]["id"], port)
def delete_port_postcommit(self, context):
# delete port on the network controller
port = context.current
net = context.network.current
self.servers.rest_delete_port(net["tenant_id"], net["id"], port['id'])
def _prepare_port_for_controller(self, context):
port = context.current
net = context.network.current
port['network'] = net
port['binding_host'] = context._binding.host
actx = ctx.get_admin_context()
if (portbindings.HOST_ID in port and 'id' in port):
host_id = port[portbindings.HOST_ID]
porttracker_db.put_port_hostid(actx, port['id'], host_id)
else:
host_id = ''
prepped_port = self._extend_port_dict_binding(actx, port)
prepped_port = self._map_state_and_status(prepped_port)
if (portbindings.HOST_ID not in prepped_port or
prepped_port[portbindings.HOST_ID] == ''):
# in ML2, controller doesn't care about ports without
# the host_id set
return False
return prepped_port

View File

@ -22,7 +22,6 @@ import webob.exc
from neutron import context from neutron import context
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron.manager import NeutronManager from neutron.manager import NeutronManager
from neutron.plugins.bigswitch.plugin import RemoteRestError
from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.bigswitch import fake_server from neutron.tests.unit.bigswitch import fake_server
from neutron.tests.unit.bigswitch import test_base from neutron.tests.unit.bigswitch import test_base
@ -33,9 +32,11 @@ import neutron.tests.unit.test_db_plugin as test_plugin
class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase, class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
test_plugin.NeutronDbPluginV2TestCase): test_plugin.NeutronDbPluginV2TestCase):
def setUp(self): def setUp(self, plugin_name=None):
self.setup_config_files() self.setup_config_files()
self.setup_patches() self.setup_patches()
if plugin_name:
self._plugin_name = plugin_name
super(BigSwitchProxyPluginV2TestCase, super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name) self).setUp(self._plugin_name)
@ -86,29 +87,12 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
#failure to create should result in no ports #failure to create should result in no ports
self.assertEqual(0, len(ports)) self.assertEqual(0, len(ports))
def test_rollback_on_port_attach(self):
with self.network() as n:
plugin_obj = NeutronManager.get_plugin()
with patch.object(plugin_obj.servers,
'rest_plug_interface') as mock_plug_interface:
mock_plug_interface.side_effect = RemoteRestError(
reason='fake error')
kwargs = {'device_id': 'somedevid',
'tenant_id': n['network']['tenant_id']}
self._create_port('json', n['network']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code,
**kwargs)
port = self._get_ports(n['network']['id'])[0]
# Attachment failure should leave created port in error state
self.assertEqual('ERROR', port['status'])
self._delete('ports', port['id'])
def test_rollback_for_port_update(self): def test_rollback_for_port_update(self):
with self.network() as n: with self.network() as n:
with self.port(network_id=n['network']['id']) as port: with self.port(network_id=n['network']['id'],
device_id='66') as port:
port = self._get_ports(n['network']['id'])[0] port = self._get_ports(n['network']['id'])[0]
data = {'port': {'name': 'aNewName'}} data = {'port': {'name': 'aNewName', 'device_id': '99'}}
self.httpPatch = patch('httplib.HTTPConnection', create=True, self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500) new=fake_server.HTTPConnectionMock500)
self.httpPatch.start() self.httpPatch.start()
@ -120,7 +104,7 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
# name should have stayed the same # name should have stayed the same
self.assertEqual(port['name'], uport['name']) self.assertEqual(port['name'], uport['name'])
def test_rollback_for_port_detach(self): def test_rollback_for_port_delete(self):
with self.network() as n: with self.network() as n:
with self.port(network_id=n['network']['id'], with self.port(network_id=n['network']['id'],
device_id='somedevid') as port: device_id='somedevid') as port:
@ -134,22 +118,6 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
port = self._get_ports(n['network']['id'])[0] port = self._get_ports(n['network']['id'])[0]
self.assertEqual('ACTIVE', port['status']) self.assertEqual('ACTIVE', port['status'])
def test_rollback_for_port_delete(self):
with self.network() as n:
with self.port(network_id=n['network']['id'],
device_id='somdevid') as port:
plugin_obj = NeutronManager.get_plugin()
with patch.object(plugin_obj.servers,
'rest_delete_port'
) as mock_plug_interface:
mock_plug_interface.side_effect = RemoteRestError(
reason='fake error')
self._delete('ports', port['port']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code)
port = self._get_ports(n['network']['id'])[0]
self.assertEqual('ERROR', port['status'])
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2, class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
BigSwitchProxyPluginV2TestCase, BigSwitchProxyPluginV2TestCase,

View File

@ -0,0 +1,95 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Big Switch Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import webob.exc
from neutron.extensions import portbindings
from neutron.plugins.ml2 import config as ml2_config
from neutron.plugins.ml2.drivers import type_vlan as vlan_config
import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
from neutron.tests.unit.ml2.test_ml2_plugin import PLUGIN_NAME as ML2_PLUGIN
from neutron.tests.unit import test_db_plugin
PHYS_NET = 'physnet1'
VLAN_START = 1000
VLAN_END = 1100
class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
def setUp(self):
# Configure the ML2 mechanism drivers and network types
ml2_opts = {
'mechanism_drivers': ['bigswitch'],
'tenant_network_types': ['vlan'],
}
for opt, val in ml2_opts.items():
ml2_config.cfg.CONF.set_override(opt, val, 'ml2')
self.addCleanup(ml2_config.cfg.CONF.reset)
# Configure the ML2 VLAN parameters
phys_vrange = ':'.join([PHYS_NET, str(VLAN_START), str(VLAN_END)])
vlan_config.cfg.CONF.set_override('network_vlan_ranges',
[phys_vrange],
'ml2_type_vlan')
self.addCleanup(vlan_config.cfg.CONF.reset)
super(TestBigSwitchMechDriverBase,
self).setUp(ML2_PLUGIN)
class TestBigSwitchMechDriverNetworksV2(test_db_plugin.TestNetworksV2,
TestBigSwitchMechDriverBase):
pass
class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,
TestBigSwitchMechDriverBase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
def setUp(self):
super(TestBigSwitchMechDriverPortsV2, self).setUp()
self.port_create_status = 'DOWN'
def test_update_port_status_build(self):
with self.port() as port:
self.assertEqual(port['port']['status'], 'DOWN')
self.assertEqual(self.port_create_status, 'DOWN')
# exercise the host_id tracking code
def test_port_vif_details(self):
kwargs = {'name': 'name', 'binding:host_id': 'ivshost',
'device_id': 'override_dev'}
with self.port(**kwargs) as port:
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_IVS)
kwargs = {'name': 'name2', 'binding:host_id': 'someotherhost',
'device_id': 'other_dev'}
with self.port(**kwargs) as port:
self.assertEqual(port['port']['binding:vif_type'], self.VIF_TYPE)
def _make_port(self, fmt, net_id, expected_res_status=None, arg_list=None,
**kwargs):
arg_list = arg_list or ()
arg_list += ('binding:host_id', )
res = self._create_port(fmt, net_id, expected_res_status,
arg_list, **kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)

View File

@ -58,6 +58,7 @@ data_files =
etc/neutron/plugins/ml2/ml2_conf.ini etc/neutron/plugins/ml2/ml2_conf.ini
etc/neutron/plugins/ml2/ml2_conf_arista.ini etc/neutron/plugins/ml2/ml2_conf_arista.ini
etc/neutron/plugins/ml2/ml2_conf_cisco.ini etc/neutron/plugins/ml2/ml2_conf_cisco.ini
etc/neutron/plugins/bigswitch/restproxy.ini
etc/neutron/plugins/mlnx = etc/neutron/plugins/mlnx/mlnx_conf.ini etc/neutron/plugins/mlnx = etc/neutron/plugins/mlnx/mlnx_conf.ini
etc/neutron/plugins/nec = etc/neutron/plugins/nec/nec.ini etc/neutron/plugins/nec = etc/neutron/plugins/nec/nec.ini
etc/neutron/plugins/nicira = etc/neutron/plugins/nicira/nvp.ini etc/neutron/plugins/nicira = etc/neutron/plugins/nicira/nvp.ini
@ -158,6 +159,7 @@ neutron.ml2.mechanism_drivers =
arista = neutron.plugins.ml2.drivers.mech_arista.mechanism_arista:AristaDriver arista = neutron.plugins.ml2.drivers.mech_arista.mechanism_arista:AristaDriver
cisco_nexus = neutron.plugins.ml2.drivers.cisco.mech_cisco_nexus:CiscoNexusMechanismDriver cisco_nexus = neutron.plugins.ml2.drivers.cisco.mech_cisco_nexus:CiscoNexusMechanismDriver
l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
bigswitch = neutron.plugins.ml2.drivers.mech_bigswitch.driver:BigSwitchMechanismDriver
[build_sphinx] [build_sphinx]
all_files = 1 all_files = 1