Refactor BigSwitch error handling to use db rollbacks

This patch adjusts most of the logic handling the
calls to the controller in the BigSwitch/floodlight
plugin to make use of the db rollbacks from
sqlalchemy for free on exceptions. This eliminates
several complex try-except blocks and makes
maintaining db<->controller consistency easier.

Fixes: bug #1215823
Change-Id: Ia636c40e744b3b1c543e891791bf492df4f675d2
This commit is contained in:
Kevin Benton 2013-08-22 20:17:00 -07:00
parent 09dad2f1fe
commit f3215f8d90
4 changed files with 615 additions and 550 deletions

View File

@ -68,6 +68,7 @@ from neutron.db import l3_db
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import l3
from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.bigswitch.db import porttracker_db
@ -302,6 +303,10 @@ class ServerPool(object):
'server': (active_server.server,
active_server.port),
'response': ret[3]})
LOG.error(_("ServerProxy: Error details: status=%(status)d, "
"reason=%(reason)r, ret=%(ret)s, data=%(data)r"),
{'status': ret[0], 'reason': ret[1], 'ret': ret[2],
'data': ret[3]})
active_server.failed = True
# All servers failed, reset server list and try again next time
@ -312,17 +317,106 @@ class ServerPool(object):
s.port) for s in self.servers)})
return (0, None, None, None)
def get(self, resource, data='', headers=None, ignore_codes=[]):
return self.rest_call('GET', resource, data, headers, ignore_codes)
def rest_action(self, action, resource, data='', errstr='%s',
ignore_codes=[], headers=None):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
By default, 404 errors on DELETE calls are ignored because
they already do not exist on the backend.
"""
if not ignore_codes and action == 'DELETE':
ignore_codes = [404]
resp = self.rest_call(action, resource, data, headers, ignore_codes)
if self.server_failure(resp, ignore_codes):
LOG.error(_("NeutronRestProxyV2: ") + errstr, resp[2])
raise RemoteRestError(resp[2])
if resp[0] in ignore_codes:
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
"code %(code)d on %(action)s action to resource "
"%(resource)s"),
{'code': resp[2], 'action': action,
'resource': resource})
return resp
def put(self, resource, data, headers=None, ignore_codes=[]):
return self.rest_call('PUT', resource, data, headers, ignore_codes)
def rest_create_router(self, tenant_id, router):
resource = ROUTER_RESOURCE_PATH % tenant_id
data = {"router": router}
errstr = _("Unable to create remote router: %s")
self.rest_action('POST', resource, data, errstr)
def post(self, resource, data, headers=None, ignore_codes=[]):
return self.rest_call('POST', resource, data, headers, ignore_codes)
def rest_update_router(self, tenant_id, router, router_id):
resource = ROUTERS_PATH % (tenant_id, router_id)
data = {"router": router}
errstr = _("Unable to update remote router: %s")
self.rest_action('PUT', resource, data, errstr)
def delete(self, resource, data='', headers=None, ignore_codes=[]):
return self.rest_call('DELETE', resource, data, headers, ignore_codes)
def rest_delete_router(self, tenant_id, router_id):
resource = ROUTERS_PATH % (tenant_id, router_id)
errstr = _("Unable to delete remote router: %s")
self.rest_action('DELETE', resource, errstr=errstr)
def rest_add_router_interface(self, tenant_id, router_id, intf_details):
resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
data = {"interface": intf_details}
errstr = _("Unable to add router interface: %s")
self.rest_action('POST', resource, data, errstr)
def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
errstr = _("Unable to delete remote intf: %s")
self.rest_action('DELETE', resource, errstr=errstr)
def rest_create_network(self, tenant_id, network):
resource = NET_RESOURCE_PATH % tenant_id
data = {"network": network}
errstr = _("Unable to create remote network: %s")
self.rest_action('POST', resource, data, errstr)
def rest_update_network(self, tenant_id, net_id, network):
resource = NETWORKS_PATH % (tenant_id, net_id)
data = {"network": network}
errstr = _("Unable to update remote network: %s")
self.rest_action('PUT', resource, data, errstr)
def rest_delete_network(self, tenant_id, net_id):
resource = NETWORKS_PATH % (tenant_id, net_id)
errstr = _("Unable to update remote network: %s")
self.rest_action('DELETE', resource, errstr=errstr)
def rest_create_port(self, net, port):
resource = PORT_RESOURCE_PATH % (net["tenant_id"], net["id"])
data = {"port": port}
errstr = _("Unable to create remote port: %s")
self.rest_action('POST', resource, data, errstr)
def rest_update_port(self, tenant_id, network_id, port, port_id):
resource = PORTS_PATH % (tenant_id, network_id, port_id)
data = {"port": port}
errstr = _("Unable to update remote port: %s")
self.rest_action('PUT', resource, data, errstr)
def rest_delete_port(self, tenant_id, network_id, port_id):
resource = PORTS_PATH % (tenant_id, network_id, port_id)
errstr = _("Unable to delete remote port: %s")
self.rest_action('DELETE', resource, errstr=errstr)
def rest_plug_interface(self, tenant_id, net_id, port,
remote_interface_id):
if port["mac_address"] is not None:
resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
data = {"attachment":
{"id": remote_interface_id,
"mac": port["mac_address"],
}
}
errstr = _("Unable to plug in interface: %s")
self.rest_action('PUT', resource, data, errstr)
def rest_unplug_interface(self, tenant_id, net_id, port_id):
resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
errstr = _("Unable to unplug interface: %s")
self.rest_action('DELETE', resource, errstr=errstr)
class RpcProxy(dhcp_rpc_base.DhcpRpcCallbackMixin):
@ -413,32 +507,20 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
self._warn_on_state_status(network['network'])
with context.session.begin(subtransactions=True):
# Validate args
tenant_id = self._get_tenant_id_for_create(context, network["network"])
tenant_id = self._get_tenant_id_for_create(context,
network["network"])
session = context.session
with session.begin(subtransactions=True):
# create network in DB
new_net = super(NeutronRestProxyV2, self).create_network(context,
network)
self._process_l3_create(context, new_net, network['network'])
mapped_network = self._get_mapped_network_with_subnets(new_net,
context)
# create network on the network controller
try:
resource = NET_RESOURCE_PATH % tenant_id
mapped_network = self._get_mapped_network_with_subnets(new_net)
data = {
"network": mapped_network
}
ret = self.servers.post(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2:Unable to create remote "
"network: %s"), e.message)
super(NeutronRestProxyV2, self).delete_network(context,
new_net['id'])
raise
self.servers.rest_create_network(tenant_id, mapped_network)
# return created network
return new_net
@ -472,25 +554,12 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
session = context.session
with session.begin(subtransactions=True):
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
new_net = super(NeutronRestProxyV2, self).update_network(context,
net_id,
network)
new_net = super(NeutronRestProxyV2, self).update_network(
context, net_id, network)
self._process_l3_update(context, new_net, network['network'])
# update network on network controller
try:
self._send_update_network(new_net)
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote "
"network: %s"), e.message)
# reset network to original state
super(NeutronRestProxyV2, self).update_network(context, id,
orig_net)
raise
# return updated network
self._send_update_network(new_net, context)
return new_net
def delete_network(self, context, net_id):
@ -520,20 +589,11 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
if not only_auto_del:
raise exceptions.NetworkInUse(net_id=net_id)
# delete from network ctrl. Remote error on delete is ignored
try:
resource = NETWORKS_PATH % (tenant_id, net_id)
ret = self.servers.delete(resource)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
with context.session.begin(subtransactions=True):
ret_val = super(NeutronRestProxyV2, self).delete_network(context,
net_id)
self.servers.rest_delete_network(tenant_id, net_id)
return ret_val
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote "
"network: %s"), e.message)
raise
def create_port(self, context, port):
"""Create a port, which is a connection point of a device
@ -563,15 +623,19 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
"""
LOG.debug(_("NeutronRestProxyV2: create_port() called"))
# Update DB
# Update DB in new session so exceptions rollback changes
with context.session.begin(subtransactions=True):
port["port"]["admin_state_up"] = False
dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, [])
new_port = super(NeutronRestProxyV2, self).create_port(context, port)
new_port = super(NeutronRestProxyV2, self).create_port(context,
port)
if (portbindings.HOST_ID in port['port']
and 'id' in new_port):
host_id = port['port'][portbindings.HOST_ID]
porttracker_db.put_port_hostid(context, new_port['id'],
port['port'][portbindings.HOST_ID])
self._process_port_create_extra_dhcp_opts(context, new_port, dhcp_opts)
host_id)
self._process_port_create_extra_dhcp_opts(context, new_port,
dhcp_opts)
new_port = self._extend_port_dict_binding(context, new_port)
net = super(NeutronRestProxyV2,
self).get_network(context, new_port["network_id"])
@ -581,30 +645,24 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
destination = METADATA_SERVER_IP + '/32'
self._add_host_route(context, destination, new_port)
# create on networl ctrl
try:
resource = PORT_RESOURCE_PATH % (net["tenant_id"], net["id"])
# create on network ctrl
mapped_port = self._map_state_and_status(new_port)
data = {
"port": mapped_port
}
ret = self.servers.post(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
self.servers.rest_create_port(net, mapped_port)
# connect device to network, if present
device_id = port["port"].get("device_id")
if device_id:
self._plug_interface(context,
net["tenant_id"], net["id"],
new_port["id"], device_id)
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to create remote port: "
"%s"), e.message)
super(NeutronRestProxyV2, self).delete_port(context,
new_port["id"])
raise
try:
self.servers.rest_plug_interface(net["tenant_id"], net["id"],
new_port, device_id)
except RemoteRestError:
with excutils.save_and_reraise_exception():
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(
context,
new_port["id"],
port_update
)
# Set port state up and return that port
port_update = {"port": {"admin_state_up": True}}
new_port = super(NeutronRestProxyV2, self).update_port(context,
@ -660,44 +718,45 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# Validate Args
orig_port = super(NeutronRestProxyV2, self).get_port(context, port_id)
with context.session.begin(subtransactions=True):
# Update DB
new_port = super(NeutronRestProxyV2, self).update_port(context,
port_id, port)
self._update_extra_dhcp_opts_on_port(context, port_id, port, new_port)
new_port = super(NeutronRestProxyV2,
self).update_port(context, port_id, port)
self._update_extra_dhcp_opts_on_port(context, port_id, port,
new_port)
if (portbindings.HOST_ID in port['port']
and 'id' in new_port):
host_id = port['port'][portbindings.HOST_ID]
porttracker_db.put_port_hostid(context, new_port['id'],
port['port'][portbindings.HOST_ID])
host_id)
new_port = self._extend_port_dict_binding(context, new_port)
# update on networl ctrl
try:
resource = PORTS_PATH % (orig_port["tenant_id"],
orig_port["network_id"], port_id)
mapped_port = self._map_state_and_status(new_port)
data = {"port": mapped_port}
ret = self.servers.put(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
if new_port.get("device_id") != orig_port.get("device_id"):
if orig_port.get("device_id"):
self._unplug_interface(context, orig_port["tenant_id"],
# update on networl ctrl
mapped_port = self._map_state_and_status(new_port)
self.servers.rest_update_port(orig_port["tenant_id"],
orig_port["network_id"],
mapped_port, port_id)
if (new_port.get("device_id") != orig_port.get("device_id") and
orig_port.get("device_id")):
try:
self.servers.rest_unplug_interface(orig_port["tenant_id"],
orig_port["network_id"],
orig_port["id"])
device_id = new_port.get("device_id")
if device_id:
self._plug_interface(context, new_port["tenant_id"],
self.rest_plug_interface(new_port["tenant_id"],
new_port["network_id"],
new_port["id"], device_id)
new_port, device_id)
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to create remote port: "
"%s"), e.message)
# reset port to original state
super(NeutronRestProxyV2, self).update_port(context, port_id,
orig_port)
raise
except RemoteRestError:
with excutils.save_and_reraise_exception():
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(
context,
new_port["id"],
port_update
)
# return new_port
return new_port
@ -719,115 +778,57 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
# and l3-router. If so, we should prevent deletion.
if l3_port_check:
self.prevent_l3_port_deletion(context, port_id)
with context.session.begin(subtransactions=True):
self.disassociate_floatingips(context, port_id)
self._unplug_port(context, port_id)
# Separate transaction for delete in case unplug passes
# but delete fails on controller
with context.session.begin(subtransactions=True):
super(NeutronRestProxyV2, self).delete_port(context, port_id)
def _delete_port(self, context, port_id):
# Delete from DB
def _unplug_port(self, context, port_id):
port = super(NeutronRestProxyV2, self).get_port(context, port_id)
tenant_id = port['tenant_id']
net_id = port['network_id']
if tenant_id == '':
net = super(NeutronRestProxyV2,
self).get_network(context, port['network_id'])
net = super(NeutronRestProxyV2, self).get_network(context, net_id)
tenant_id = net['tenant_id']
# delete from network ctrl. Remote error on delete is ignored
try:
resource = PORTS_PATH % (tenant_id, port["network_id"],
port_id)
ret = self.servers.delete(resource)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
if port.get("device_id"):
self._unplug_interface(context, tenant_id,
port["network_id"], port["id"])
ret_val = super(NeutronRestProxyV2, self)._delete_port(context,
port_id)
return ret_val
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote port: "
"%s"), e.message)
raise
self.servers.rest_unplug_interface(tenant_id, net_id, port_id)
# Port should transition to error state now that it's unplugged
# but not yet deleted
port_update = {"port": {"status": "ERROR"}}
super(NeutronRestProxyV2, self).update_port(context,
port_id,
port_update)
def _plug_interface(self, context, tenant_id, net_id, port_id,
remote_interface_id):
"""Plug remote interface to the network.
Attaches a remote interface to the specified port on the specified
Virtual Network.
:returns: None
:raises: exceptions.NetworkNotFound
:raises: exceptions.PortNotFound
:raises: RemoteRestError
"""
LOG.debug(_("NeutronRestProxyV2: _plug_interface() called"))
# update attachment on network controller
try:
def _delete_port(self, context, port_id):
port = super(NeutronRestProxyV2, self).get_port(context, port_id)
mac = port["mac_address"]
if mac is not None:
resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
data = {"attachment":
{"id": remote_interface_id,
"mac": mac,
}
}
ret = self.servers.put(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2:Unable to update remote network: "
"%s"), e.message)
raise
def _unplug_interface(self, context, tenant_id, net_id, port_id):
"""Detach interface from the network controller.
Detaches a remote interface from the specified port on the network
controller.
:returns: None
:raises: RemoteRestError
"""
LOG.debug(_("NeutronRestProxyV2: _unplug_interface() called"))
# delete from network ctrl. Remote error on delete is ignored
try:
resource = ATTACHMENT_PATH % (tenant_id, net_id, port_id)
ret = self.servers.delete(resource, ignore_codes=[404])
if self.servers.server_failure(ret, ignore_codes=[404]):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote port: "
"%s"), e.message)
raise
tenant_id = port['tenant_id']
net_id = port['network_id']
if tenant_id == '':
net = super(NeutronRestProxyV2, self).get_network(context, net_id)
tenant_id = net['tenant_id']
# Delete from DB
ret_val = super(NeutronRestProxyV2,
self)._delete_port(context, port_id)
self.servers.rest_delete_port(tenant_id, net_id, port_id)
return ret_val
def create_subnet(self, context, subnet):
LOG.debug(_("NeutronRestProxyV2: create_subnet() called"))
self._warn_on_state_status(subnet['subnet'])
with context.session.begin(subtransactions=True):
# create subnet in DB
new_subnet = super(NeutronRestProxyV2, self).create_subnet(context,
subnet)
new_subnet = super(NeutronRestProxyV2,
self).create_subnet(context, subnet)
net_id = new_subnet['network_id']
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
orig_net = super(NeutronRestProxyV2,
self).get_network(context, net_id)
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError:
# rollback creation of subnet
super(NeutronRestProxyV2, self).delete_subnet(context,
subnet['id'])
raise
return new_subnet
def update_subnet(self, context, id, subnet):
@ -835,38 +836,28 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
self._warn_on_state_status(subnet['subnet'])
orig_subnet = super(NeutronRestProxyV2, self)._get_subnet(context, id)
with context.session.begin(subtransactions=True):
# update subnet in DB
new_subnet = super(NeutronRestProxyV2, self).update_subnet(context, id,
subnet)
new_subnet = super(NeutronRestProxyV2,
self).update_subnet(context, id, subnet)
net_id = new_subnet['network_id']
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
orig_net = super(NeutronRestProxyV2,
self).get_network(context, net_id)
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError:
# rollback updation of subnet
super(NeutronRestProxyV2, self).update_subnet(context, id,
orig_subnet)
raise
return new_subnet
def delete_subnet(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_subnet() called"))
orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id)
net_id = orig_subnet['network_id']
with context.session.begin(subtransactions=True):
# delete subnet in DB
super(NeutronRestProxyV2, self).delete_subnet(context, id)
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
# update network on network controller
try:
# update network on network controller - exception will rollback
self._send_update_network(orig_net)
except RemoteRestError:
# TODO(Sumit): rollback deletion of subnet
raise
def _get_tenant_default_router_rules(self, tenant):
rules = cfg.CONF.ROUTER.tenant_default_router_rule
@ -905,26 +896,12 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
rules = self._get_tenant_default_router_rules(tenant_id)
router['router']['router_rules'] = rules
with context.session.begin(subtransactions=True):
# create router in DB
new_router = super(NeutronRestProxyV2, self).create_router(context,
router)
# create router on the network controller
try:
resource = ROUTER_RESOURCE_PATH % tenant_id
mapped_router = self._map_state_and_status(new_router)
data = {
"router": mapped_router
}
ret = self.servers.post(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to create remote router: "
"%s"), e.message)
super(NeutronRestProxyV2, self).delete_router(context,
new_router['id'])
raise
self.servers.rest_create_router(tenant_id, mapped_router)
# return created router
return new_router
@ -938,28 +915,13 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
orig_router = super(NeutronRestProxyV2, self).get_router(context,
router_id)
tenant_id = orig_router["tenant_id"]
new_router = super(NeutronRestProxyV2, self).update_router(context,
router_id,
router)
with context.session.begin(subtransactions=True):
new_router = super(NeutronRestProxyV2,
self).update_router(context, router_id, router)
router = self._map_state_and_status(new_router)
# update router on network controller
try:
resource = ROUTERS_PATH % (tenant_id, router_id)
mapped_router = self._map_state_and_status(new_router)
data = {
"router": mapped_router
}
ret = self.servers.put(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote router: "
"%s"), e.message)
# reset router to original state
super(NeutronRestProxyV2, self).update_router(context,
router_id,
orig_router)
raise
self.servers.rest_update_router(tenant_id, router, router_id)
# return updated router
return new_router
@ -985,20 +947,12 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=router_id)
ret_val = super(NeutronRestProxyV2,
self).delete_router(context, router_id)
# delete from network ctrl. Remote error on delete is ignored
try:
resource = ROUTERS_PATH % (tenant_id, router_id)
ret = self.servers.delete(resource)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
ret_val = super(NeutronRestProxyV2, self).delete_router(context,
router_id)
# delete from network ctrl
self.servers.rest_delete_router(tenant_id, router_id)
return ret_val
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to delete remote router: "
"%s"), e.message)
raise
def add_router_interface(self, context, router_id, interface_info):
@ -1008,14 +962,15 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
router = self._get_router(context, router_id)
tenant_id = router['tenant_id']
with context.session.begin(subtransactions=True):
# create interface in DB
new_interface_info = super(NeutronRestProxyV2,
new_intf_info = super(NeutronRestProxyV2,
self).add_router_interface(context,
router_id,
interface_info)
port = self._get_port(context, new_interface_info['port_id'])
port = self._get_port(context, new_intf_info['port_id'])
net_id = port['network_id']
subnet_id = new_interface_info['subnet_id']
subnet_id = new_intf_info['subnet_id']
# we will use the port's network id as interface's id
interface_id = net_id
intf_details = self._get_router_intf_details(context,
@ -1023,21 +978,9 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
subnet_id)
# create interface on the network controller
try:
resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
data = {"interface": intf_details}
ret = self.servers.post(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to create interface: "
"%s"), e.message)
super(NeutronRestProxyV2,
self).remove_router_interface(context, router_id,
interface_info)
raise
return new_interface_info
self.servers.rest_add_router_interface(tenant_id, router_id,
intf_details)
return new_intf_info
def remove_router_interface(self, context, router_id, interface_info):
@ -1061,29 +1004,22 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
msg = "Either subnet_id or port_id must be specified"
raise exceptions.BadRequest(resource='router', msg=msg)
with context.session.begin(subtransactions=True):
# remove router in DB
del_intf_info = super(NeutronRestProxyV2,
del_ret = super(NeutronRestProxyV2,
self).remove_router_interface(context,
router_id,
interface_info)
# create router on the network controller
try:
resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
ret = self.servers.delete(resource)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2:Unable to delete remote intf: "
"%s"), e.message)
raise
# return new interface
return del_intf_info
self.servers.rest_remove_router_interface(tenant_id, router_id,
interface_id)
return del_ret
def create_floatingip(self, context, floatingip):
LOG.debug(_("NeutronRestProxyV2: create_floatingip() called"))
with context.session.begin(subtransactions=True):
# create floatingip in DB
new_fl_ip = super(NeutronRestProxyV2,
self).create_floatingip(context, floatingip)
@ -1095,21 +1031,17 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
try:
self._send_update_network(orig_net)
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to create remote "
"floatin IP: %s"), e.message)
super(NeutronRestProxyV2, self).delete_floatingip(context,
floatingip)
raise
with excutils.save_and_reraise_exception():
LOG.error(
_("NeutronRestProxyV2: Unable to create remote "
"floating IP: %s"), e)
# return created floating IP
return new_fl_ip
def update_floatingip(self, context, id, floatingip):
LOG.debug(_("NeutronRestProxyV2: update_floatingip() called"))
orig_fl_ip = super(NeutronRestProxyV2, self).get_floatingip(context,
id)
with context.session.begin(subtransactions=True):
# update floatingip in DB
new_fl_ip = super(NeutronRestProxyV2,
self).update_floatingip(context, id, floatingip)
@ -1118,13 +1050,7 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError:
# rollback updation of subnet
super(NeutronRestProxyV2, self).update_floatingip(context, id,
orig_fl_ip)
raise
return new_fl_ip
def delete_floatingip(self, context, id):
@ -1132,6 +1058,7 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
orig_fl_ip = super(NeutronRestProxyV2, self).get_floatingip(context,
id)
with context.session.begin(subtransactions=True):
# delete floating IP in DB
net_id = orig_fl_ip['floating_network_id']
super(NeutronRestProxyV2, self).delete_floatingip(context, id)
@ -1139,11 +1066,7 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
orig_net = super(NeutronRestProxyV2, self).get_network(context,
net_id)
# update network on network controller
try:
self._send_update_network(orig_net)
except RemoteRestError:
# TODO(Sumit): rollback deletion of floating IP
raise
def _send_all_data(self):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
@ -1200,20 +1123,13 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
routers.append(mapped_router)
try:
resource = '/topology'
data = {
'networks': networks,
'routers': routers,
}
ret = self.servers.put(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
return ret
except RemoteRestError as e:
LOG.error(_('NeutronRestProxy: Unable to update remote '
'topology: %s'), e.message)
raise
errstr = _("Unable to update remote topology: %s")
return self.servers.rest_action('PUT', resource, data, errstr)
def _add_host_route(self, context, destination, port):
subnet = {}
@ -1232,21 +1148,25 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug("destination:%s nexthop:%s" % (destination,
nexthop))
def _get_network_with_floatingips(self, network):
admin_context = qcontext.get_admin_context()
def _get_network_with_floatingips(self, network, context=None):
if context is None:
context = qcontext.get_admin_context()
net_id = network['id']
net_filter = {'floating_network_id': [net_id]}
fl_ips = super(NeutronRestProxyV2,
self).get_floatingips(admin_context,
self).get_floatingips(context,
filters=net_filter) or []
network['floatingips'] = fl_ips
return network
def _get_all_subnets_json_for_network(self, net_id):
admin_context = qcontext.get_admin_context()
subnets = self._get_subnets_by_network(admin_context,
def _get_all_subnets_json_for_network(self, net_id, context=None):
if context is None:
context = qcontext.get_admin_context()
# start a sub-transaction to avoid breaking parent transactions
with context.session.begin(subtransactions=True):
subnets = self._get_subnets_by_network(context,
net_id)
subnets_details = []
if subnets:
@ -1257,10 +1177,13 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
return subnets_details
def _get_mapped_network_with_subnets(self, network):
admin_context = qcontext.get_admin_context()
def _get_mapped_network_with_subnets(self, network, context=None):
# if context is not provided, admin context is used
if context is None:
context = qcontext.get_admin_context()
network = self._map_state_and_status(network)
subnets = self._get_all_subnets_json_for_network(network['id'])
subnets = self._get_all_subnets_json_for_network(network['id'],
context)
network['subnets'] = subnets
for subnet in (subnets or []):
if subnet['gateway_ip']:
@ -1269,30 +1192,20 @@ class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
break
else:
network['gateway'] = ''
network[l3.EXTERNAL] = self._network_is_external(admin_context,
network[l3.EXTERNAL] = self._network_is_external(context,
network['id'])
return network
def _send_update_network(self, network):
def _send_update_network(self, network, context=None):
net_id = network['id']
tenant_id = network['tenant_id']
# update network on network controller
try:
resource = NETWORKS_PATH % (tenant_id, net_id)
mapped_network = self._get_mapped_network_with_subnets(network)
net_fl_ips = self._get_network_with_floatingips(mapped_network)
data = {
"network": net_fl_ips,
}
ret = self.servers.put(resource, data)
if not self.servers.action_success(ret):
raise RemoteRestError(ret[2])
except RemoteRestError as e:
LOG.error(_("NeutronRestProxyV2: Unable to update remote "
"network: %s"), e.message)
raise
mapped_network = self._get_mapped_network_with_subnets(network,
context)
net_fl_ips = self._get_network_with_floatingips(mapped_network,
context)
self.servers.rest_update_network(tenant_id, net_id, net_fl_ips)
def _map_state_and_status(self, resource):
resource = copy.copy(resource)

View File

@ -0,0 +1,96 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Big Switch Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kevin Benton, <kevin.benton@bigswitch.com>
#
class HTTPResponseMock():
status = 200
reason = 'OK'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
pass
def read(self):
return "{'status': '200 OK'}"
class HTTPResponseMock404(HTTPResponseMock):
status = 404
reason = 'Not Found'
def read(self):
return "{'status': '404 Not Found'}"
class HTTPResponseMock500(HTTPResponseMock):
status = 500
reason = 'Internal Server Error'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False, errmsg='500 Internal Server Error'):
self.errmsg = errmsg
def read(self):
return "{'status': '%s'}" % self.errmsg
class HTTPConnectionMock():
def __init__(self, server, port, timeout):
self.response = None
self.broken = False
# Port 9000 is the broken server
if port == 9000:
self.broken = True
errmsg = "This server is broken, please try another"
self.response = HTTPResponseMock500(None, errmsg=errmsg)
def request(self, action, uri, body, headers):
if self.broken and "ExceptOnBadServer" in uri:
raise Exception("Broken server got an unexpected request")
if self.response:
return
# detachment may return 404 and plugin shouldn't die
if uri.endswith('attachment') and action == 'DELETE':
self.response = HTTPResponseMock404(None)
else:
self.response = HTTPResponseMock(None)
# Port creations/updates must contain binding information
if ('port' in uri and 'attachment' not in uri
and 'binding' not in body and action in ('POST', 'PUT')):
errmsg = "Port binding info missing in port request '%s'" % body
self.response = HTTPResponseMock500(None, errmsg=errmsg)
return
return
def getresponse(self):
return self.response
def close(self):
pass
class HTTPConnectionMock500(HTTPConnectionMock):
def __init__(self, server, port, timeout):
self.response = HTTPResponseMock500(None)
self.broken = True

View File

@ -22,79 +22,18 @@ from oslo.config import cfg
import webob.exc
import neutron.common.test_lib as test_lib
from neutron import context
from neutron.extensions import portbindings
from neutron.manager import NeutronManager
from neutron.plugins.bigswitch.plugin import RemoteRestError
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.bigswitch import fake_server
from neutron.tests.unit import test_api_v2
import neutron.tests.unit.test_db_plugin as test_plugin
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
class HTTPResponseMock():
status = 200
reason = 'OK'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
pass
def read(self):
return "{'status': '200 OK'}"
class HTTPResponseMock404():
status = 404
reason = 'Not Found'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
pass
def read(self):
return "{'status': '404 Not Found'}"
class HTTPResponseMock500():
status = 500
reason = 'Internal Server Error'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
pass
def read(self):
return "{'status': '500 Internal Server Error'}"
class HTTPConnectionMock():
def __init__(self, server, port, timeout):
if port == 9000:
self.response = HTTPResponseMock500(None)
self.broken = True
else:
self.response = HTTPResponseMock(None)
self.broken = False
def request(self, action, uri, body, headers):
if self.broken:
if "ExceptOnBadServer" in uri:
raise Exception("Broken server got an unexpected request")
return
if uri.endswith('attachment') and action == 'DELETE':
self.response = HTTPResponseMock404(None)
else:
self.response = HTTPResponseMock(None)
return
def getresponse(self):
return self.response
def close(self):
pass
class BigSwitchProxyPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
_plugin_name = ('%s.NeutronRestProxyV2' % RESTPROXY_PKG_PATH)
@ -105,7 +44,7 @@ class BigSwitchProxyPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
'restproxy.ini.test')]
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
new=fake_server.HTTPConnectionMock)
self.addCleanup(self.httpPatch.stop)
self.httpPatch.start()
super(BigSwitchProxyPluginV2TestCase,
@ -138,6 +77,89 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = False
def _get_ports(self, netid):
return self.deserialize('json',
self._list_ports('json', netid=netid))['ports']
def test_rollback_for_port_create(self):
with self.network(no_delete=True) as n:
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
kwargs = {'device_id': 'somedevid',
'tenant_id': n['network']['tenant_id']}
self._create_port('json', n['network']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code,
**kwargs)
self.httpPatch.stop()
ports = self._get_ports(n['network']['id'])
#failure to create should result in no ports
self.assertEqual(0, len(ports))
def test_rollback_on_port_attach(self):
with self.network() as n:
plugin_obj = NeutronManager.get_plugin()
with patch.object(plugin_obj.servers,
'rest_plug_interface') as mock_plug_interface:
mock_plug_interface.side_effect = RemoteRestError('fake error')
kwargs = {'device_id': 'somedevid',
'tenant_id': n['network']['tenant_id']}
self._create_port('json', n['network']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code,
**kwargs)
port = self._get_ports(n['network']['id'])[0]
# Attachment failure should leave created port in error state
self.assertEqual('ERROR', port['status'])
self._delete('ports', port['id'])
def test_rollback_for_port_update(self):
with self.network() as n:
with self.port(network_id=n['network']['id']) as port:
port = self._get_ports(n['network']['id'])[0]
data = {'port': {'name': 'aNewName'}}
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
self.new_update_request('ports',
data,
port['id']).get_response(self.api)
self.httpPatch.stop()
uport = self._get_ports(n['network']['id'])[0]
# name should have stayed the same
self.assertEqual(port['name'], uport['name'])
def test_rollback_for_port_detach(self):
with self.network() as n:
with self.port(network_id=n['network']['id'],
device_id='somedevid') as port:
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
self._delete('ports', port['port']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code)
self.httpPatch.stop()
port = self._get_ports(n['network']['id'])[0]
self.assertEqual('ACTIVE', port['status'])
def test_rollback_for_port_delete(self):
with self.network() as n:
with self.port(network_id=n['network']['id'],
device_id='somdevid') as port:
plugin_obj = NeutronManager.get_plugin()
with patch.object(plugin_obj.servers,
'rest_delete_port'
) as mock_plug_interface:
mock_plug_interface.side_effect = RemoteRestError(
'fake error')
self._delete('ports', port['port']['id'],
expected_code=
webob.exc.HTTPInternalServerError.code)
port = self._get_ports(n['network']['id'])[0]
self.assertEqual('ERROR', port['status'])
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
BigSwitchProxyPluginV2TestCase,
@ -213,7 +235,45 @@ class TestBigSwitchVIFOverride(test_plugin.TestPortsV2,
class TestBigSwitchProxyNetworksV2(test_plugin.TestNetworksV2,
BigSwitchProxyPluginV2TestCase):
pass
def _get_networks(self, tenant_id):
ctx = context.Context('', tenant_id)
return NeutronManager.get_plugin().get_networks(ctx)
def test_rollback_on_network_create(self):
tid = test_api_v2._uuid()
kwargs = {'tenant_id': tid}
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
self._create_network('json', 'netname', True, **kwargs)
self.httpPatch.stop()
self.assertFalse(self._get_networks(tid))
def test_rollback_on_network_update(self):
with self.network() as n:
data = {'network': {'name': 'aNewName'}}
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
self.new_update_request('networks', data,
n['network']['id']).get_response(self.api)
self.httpPatch.stop()
updatedn = self._get_networks(n['network']['tenant_id'])[0]
# name should have stayed the same due to failure
self.assertEqual(n['network']['name'], updatedn['name'])
def test_rollback_on_network_delete(self):
with self.network() as n:
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
self._delete('networks', n['network']['id'],
expected_code=webob.exc.HTTPInternalServerError.code)
self.httpPatch.stop()
# network should still exist in db
self.assertEqual(n['network']['id'],
self._get_networks(n['network']['tenant_id']
)[0]['id'])
class TestBigSwitchProxySubnetsV2(test_plugin.TestSubnetsV2,

View File

@ -26,11 +26,14 @@ from oslo.config import cfg
from webob import exc
from neutron.common.test_lib import test_config
from neutron import context
from neutron.extensions import l3
from neutron.manager import NeutronManager
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common.notifier import test_notifier
from neutron.plugins.bigswitch.extensions import routerrule
from neutron.tests.unit.bigswitch import fake_server
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extension_extradhcpopts as test_extradhcp
from neutron.tests.unit import test_l3_plugin
@ -54,51 +57,6 @@ def new_L3_setUp(self):
origSetUp = test_l3_plugin.L3NatDBTestCase.setUp
class HTTPResponseMock():
status = 200
reason = 'OK'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
pass
def read(self):
return "{'status': '200 OK'}"
class HTTPResponseMock500():
status = 500
reason = 'Internal Server Error'
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False, errmsg='500 Internal Server Error'):
self.errmsg = errmsg
def read(self):
return "{'status': '%s'}" % self.errmsg
class HTTPConnectionMock():
def __init__(self, server, port, timeout):
self.response = None
def request(self, action, uri, body, headers):
self.response = HTTPResponseMock(None)
# Port creations/updates must contain binding information
if ('port' in uri and 'attachment' not in uri
and 'binding' not in body and action in ('POST', 'PUT')):
errmsg = "Port binding info missing in port request '%s'" % body
self.response = HTTPResponseMock500(None, errmsg=errmsg)
return
def getresponse(self):
return self.response
def close(self):
pass
class RouterRulesTestExtensionManager(object):
def get_resources(self):
@ -117,7 +75,7 @@ class DHCPOptsTestCase(test_extradhcp.TestExtraDhcpOpt):
def setUp(self, plugin=None):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
self.addCleanup(self.httpPatch.stop)
p_path = 'neutron.plugins.bigswitch.plugin.NeutronRestProxyV2'
@ -128,7 +86,7 @@ class RouterDBTestCase(test_l3_plugin.L3NatDBTestCase):
def setUp(self):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
test_l3_plugin.L3NatDBTestCase.setUp = new_L3_setUp
super(RouterDBTestCase, self).setUp()
@ -498,6 +456,44 @@ class RouterDBTestCase(test_l3_plugin.L3NatDBTestCase):
{'router': {'router_rules': rules}},
expected_code=exc.HTTPBadRequest.code)
def test_rollback_on_router_create(self):
tid = test_api_v2._uuid()
self.errhttpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.errhttpPatch.start()
self._create_router('json', tid)
self.errhttpPatch.stop()
self.assertTrue(len(self._get_routers(tid)) == 0)
def test_rollback_on_router_update(self):
with self.router() as r:
data = {'router': {'name': 'aNewName'}}
self.errhttpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.errhttpPatch.start()
self.new_update_request('routers', data,
r['router']['id']).get_response(self.api)
self.errhttpPatch.stop()
updatedr = self._get_routers(r['router']['tenant_id'])[0]
# name should have stayed the same due to failure
self.assertEqual(r['router']['name'], updatedr['name'])
def test_rollback_on_router_delete(self):
with self.router() as r:
self.errhttpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.errhttpPatch.start()
self._delete('routers', r['router']['id'],
expected_code=exc.HTTPInternalServerError.code)
self.errhttpPatch.stop()
self.assertEqual(r['router']['id'],
self._get_routers(r['router']['tenant_id']
)[0]['id'])
def _get_routers(self, tenant_id):
ctx = context.Context('', tenant_id)
return self.plugin_obj.get_routers(ctx)
def _strip_rule_ids(rules):
cleaned = []