Merge "NSXv: use contexts correctly while using threads"

This commit is contained in:
Jenkins 2016-09-07 11:52:59 +00:00 committed by Gerrit Code Review
commit c47854dd6b
6 changed files with 74 additions and 63 deletions

View File

@ -42,7 +42,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
availability_zone=availability_zone)
if allow_metadata:
self.plugin.metadata_proxy_handler.configure_router_edge(
lrouter['id'])
lrouter['id'], context)
def update_router(self, context, router_id, router):
r = router['router']
@ -84,7 +84,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
self.edge_manager.unbind_router_on_edge(context, router_id)
metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler:
metadata_proxy_handler.cleanup_router_edge(router_id)
metadata_proxy_handler.cleanup_router_edge(context, router_id)
def _build_router_data_from_db(self, router_db, router):
"""Return a new dictionary with all DB & requested router attributes
@ -135,7 +135,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
def delete_router(self, context, router_id):
if self.plugin.metadata_proxy_handler:
self.plugin.metadata_proxy_handler.cleanup_router_edge(
router_id)
context, router_id)
self.edge_manager.delete_lrouter(context, router_id, dist=False)
def update_routes(self, context, router_id, nexthop):

View File

@ -584,7 +584,8 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
# configure metadata service on the router.
metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler and new:
metadata_proxy_handler.configure_router_edge(router_id)
metadata_proxy_handler.configure_router_edge(router_id,
context)
edge_id = edge_utils.get_router_edge_id(context, router_id)
with locking.LockManager.get_lock(str(edge_id)):
# add all internal interfaces of the router on edge
@ -602,7 +603,7 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
self.edge_manager.unbind_router_on_edge(context, router_id)
metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler:
metadata_proxy_handler.cleanup_router_edge(router_id)
metadata_proxy_handler.cleanup_router_edge(context, router_id)
def _add_router_services_on_available_edge(self, context, router_id):
router_ids = self.edge_manager.get_routers_on_same_edge(

View File

@ -107,16 +107,16 @@ class NsxVMetadataProxyHandler(object):
def __init__(self, nsxv_plugin):
self.nsxv_plugin = nsxv_plugin
self.context = neutron_context.get_admin_context()
context = neutron_context.get_admin_context()
# Init cannot run concurrently on multiple nodes
with locking.LockManager.get_lock('nsx-metadata-init'):
self.internal_net, self.internal_subnet = (
self._get_internal_network_and_subnet())
self._get_internal_network_and_subnet(context))
self.proxy_edge_ips = self._get_proxy_edges()
self.proxy_edge_ips = self._get_proxy_edges(context)
def _create_metadata_internal_network(self, cidr):
def _create_metadata_internal_network(self, context, cidr):
# Neutron requires a network to have some tenant_id
tenant_id = nsxv_constants.INTERNAL_TENANT_ID
@ -125,7 +125,7 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False,
'shared': False,
'tenant_id': tenant_id}}
net = self.nsxv_plugin.create_network(self.context, net_data)
net = self.nsxv_plugin.create_network(context, net_data)
subnet_data = {'subnet':
{'cidr': cidr,
@ -140,18 +140,18 @@ class NsxVMetadataProxyHandler(object):
'tenant_id': tenant_id}}
subnet = self.nsxv_plugin.create_subnet(
self.context,
context,
subnet_data)
return net['id'], subnet['id']
def _get_internal_network_and_subnet(self):
def _get_internal_network_and_subnet(self, context):
internal_net = None
internal_subnet = None
# Try to find internal net, internal subnet. If not found, create new
net_list = nsxv_db.get_nsxv_internal_network(
self.context.session,
context.session,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE)
if net_list:
@ -159,7 +159,7 @@ class NsxVMetadataProxyHandler(object):
if internal_net:
internal_subnet = self.nsxv_plugin.get_subnets(
self.context,
context,
fields=['id'],
filters={'network_id': [internal_net]})[0]['id']
@ -169,15 +169,15 @@ class NsxVMetadataProxyHandler(object):
try:
internal_net, internal_subnet = (
self._create_metadata_internal_network(
INTERNAL_SUBNET))
context, INTERNAL_SUBNET))
except Exception as e:
nsxv_db.delete_nsxv_internal_network(
self.context.session,
context.session,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE)
# if network is created, clean up
if internal_net:
self.nsxv_plugin.delete_network(self.context,
self.nsxv_plugin.delete_network(context,
internal_net)
LOG.exception(_LE("Exception %s while creating internal "
@ -186,7 +186,7 @@ class NsxVMetadataProxyHandler(object):
# Update the new network_id in DB
nsxv_db.create_nsxv_internal_network(
self.context.session,
context.session,
nsxv_constants.INTER_EDGE_PURPOSE,
internal_net)
else:
@ -196,25 +196,23 @@ class NsxVMetadataProxyHandler(object):
return internal_net, internal_subnet
def _get_edge_internal_ip(self, rtr_id):
def _get_edge_internal_ip(self, context, rtr_id):
filters = {
'network_id': [self.internal_net],
'device_id': [rtr_id]}
ports = self.nsxv_plugin.get_ports(self.context, filters=filters)
ports = self.nsxv_plugin.get_ports(context, filters=filters)
if ports:
return ports[0]['fixed_ips'][0]['ip_address']
else:
LOG.error(_LE("No port found for metadata for %s"), rtr_id)
def _get_edge_rtr_id_by_ext_ip(self, edge_ip):
def _get_edge_rtr_id_by_ext_ip(self, context, edge_ip):
rtr_list = nsxv_db.get_nsxv_internal_edge(
self.context.session, edge_ip)
context.session, edge_ip)
if rtr_list:
return rtr_list[0]['router_id']
def _get_edge_id_by_rtr_id(self, rtr_id, context=None):
if not context:
context = self.context
def _get_edge_id_by_rtr_id(self, context, rtr_id):
binding = nsxv_db.get_nsxv_router_binding(
context.session,
rtr_id)
@ -222,10 +220,10 @@ class NsxVMetadataProxyHandler(object):
if binding:
return binding['edge_id']
def _get_proxy_edges(self):
def _get_proxy_edges(self, context):
proxy_edge_ips = []
db_edge_ips = get_db_internal_edge_ips(self.context)
db_edge_ips = get_db_internal_edge_ips(context)
if len(db_edge_ips) > len(cfg.CONF.nsxv.mgt_net_proxy_ips):
error = _('Number of configured metadata proxy IPs is smaller '
'than number of Edges which are already provisioned')
@ -279,10 +277,12 @@ class NsxVMetadataProxyHandler(object):
def _setup_proxy_edge_route_and_connectivity(self, rtr_ext_ip,
rtr_id=None, edge_id=None):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
if not rtr_id:
rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_ext_ip)
rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_ext_ip)
if not edge_id:
edge_id = self._get_edge_id_by_rtr_id(rtr_id)
edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
# Read and validate DGW. If different, replace with new value
try:
@ -300,7 +300,7 @@ class NsxVMetadataProxyHandler(object):
if dgw != cfg.CONF.nsxv.mgt_net_default_gateway:
if cfg.CONF.nsxv.metadata_initializer:
self.nsxv_plugin._update_routes(
self.context, rtr_id,
context, rtr_id,
cfg.CONF.nsxv.mgt_net_default_gateway)
else:
error = _('Metadata initialization is incomplete on '
@ -365,31 +365,37 @@ class NsxVMetadataProxyHandler(object):
"proxy edge %(edge)s: %(err)s"),
{'edge': edge_id, 'err': e})
edge_ip = self._get_edge_internal_ip(rtr_id)
edge_ip = self._get_edge_internal_ip(context, rtr_id)
if edge_ip:
return edge_ip
def _setup_proxy_edge_external_interface_ip(self, rtr_ext_ips):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
rtr_old_ext_ip, rtr_new_ext_ip = rtr_ext_ips
rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_old_ext_ip)
edge_id = self._get_edge_id_by_rtr_id(rtr_id)
rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_old_ext_ip)
edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
# Replace DB entry as we cannot update the table PK
nsxv_db.delete_nsxv_internal_edge(self.context.session, rtr_old_ext_ip)
nsxv_db.delete_nsxv_internal_edge(context.session, rtr_old_ext_ip)
edge_ip = self._setup_proxy_edge_route_and_connectivity(
rtr_new_ext_ip, rtr_id, edge_id)
nsxv_db.create_nsxv_internal_edge(
self.context.session, rtr_new_ext_ip,
context.session, rtr_new_ext_ip,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id)
if edge_ip:
return edge_ip
def _setup_new_proxy_edge(self, rtr_ext_ip):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
rtr_id = None
try:
router_data = {
@ -400,12 +406,12 @@ class NsxVMetadataProxyHandler(object):
'tenant_id': None}}
rtr = self.nsxv_plugin.create_router(
self.context,
context,
router_data,
allow_metadata=False)
rtr_id = rtr['id']
edge_id = self._get_edge_id_by_rtr_id(rtr_id)
edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
self.nsxv_plugin.nsx_v.update_interface(
rtr['id'],
@ -428,14 +434,14 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False,
'tenant_id': None}}
port = self.nsxv_plugin.create_port(self.context, port_data)
port = self.nsxv_plugin.create_port(context, port_data)
address_groups = self._get_address_groups(
self.context, self.internal_net, rtr_id, is_proxy=True)
context, self.internal_net, rtr_id, is_proxy=True)
edge_ip = port['fixed_ips'][0]['ip_address']
edge_utils.update_internal_interface(
self.nsxv_plugin.nsx_v, self.context, rtr_id,
self.nsxv_plugin.nsx_v, context, rtr_id,
self.internal_net, address_groups)
self._setup_metadata_lb(rtr_id,
@ -454,18 +460,18 @@ class NsxVMetadataProxyHandler(object):
edge_utils.update_firewall(
self.nsxv_plugin.nsx_v,
self.context,
context,
rtr_id,
{'firewall_rule_list': firewall_rules},
allow_external=False)
if cfg.CONF.nsxv.mgt_net_default_gateway:
self.nsxv_plugin._update_routes(
self.context, rtr_id,
context, rtr_id,
cfg.CONF.nsxv.mgt_net_default_gateway)
nsxv_db.create_nsxv_internal_edge(
self.context.session, rtr_ext_ip,
context.session, rtr_ext_ip,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id)
return edge_ip
@ -475,19 +481,19 @@ class NsxVMetadataProxyHandler(object):
"for metadata service"), e)
ports = self.nsxv_plugin.get_ports(
self.context, filters={'device_id': [rtr_id]})
context, filters={'device_id': [rtr_id]})
for port in ports:
self.nsxv_plugin.delete_port(self.context, port['id'],
self.nsxv_plugin.delete_port(context, port['id'],
l3_port_check=True,
nw_gw_port_check=True)
nsxv_db.delete_nsxv_internal_edge(
self.context.session,
context.session,
rtr_ext_ip)
if rtr_id:
self.nsxv_plugin.delete_router(self.context, rtr_id)
self.nsxv_plugin.delete_router(context, rtr_id)
def _get_address_groups(self, context, network_id, device_id, is_proxy):
@ -559,9 +565,9 @@ class NsxVMetadataProxyHandler(object):
proxy_lb=False, context=None):
if context is None:
context = self.context
context = neutron_context.get_admin_context()
edge_id = self._get_edge_id_by_rtr_id(rtr_id, context=context)
edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
LOG.debug('Setting up Edge device %s', edge_id)
lb_obj = nsxv_lb.NsxvLoadbalancer()
@ -641,7 +647,8 @@ class NsxVMetadataProxyHandler(object):
lb_obj.submit_to_backend(self.nsxv_plugin.nsx_v.vcns, edge_id)
def configure_router_edge(self, rtr_id, context=None):
def configure_router_edge(self, rtr_id, context):
ctx = context.elevated()
# Connect router interface to inter-edge network
port_data = {
'port': {
@ -655,10 +662,10 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False,
'tenant_id': None}}
self.nsxv_plugin.create_port(self.context, port_data)
self.nsxv_plugin.create_port(ctx, port_data)
address_groups = self._get_address_groups(
self.context,
ctx,
self.internal_net,
rtr_id,
is_proxy=False)
@ -681,11 +688,11 @@ class NsxVMetadataProxyHandler(object):
proxy_lb=False,
context=context)
def cleanup_router_edge(self, rtr_id, warn=False):
def cleanup_router_edge(self, context, rtr_id, warn=False):
filters = {
'network_id': [self.internal_net],
'device_id': [rtr_id]}
ports = self.nsxv_plugin.get_ports(self.context, filters=filters)
ports = self.nsxv_plugin.get_ports(context, filters=filters)
if ports:
if warn:
@ -694,7 +701,7 @@ class NsxVMetadataProxyHandler(object):
{'port': ports[0]['id'], 'router': rtr_id})
try:
self.nsxv_plugin.delete_port(
self.context, ports[0]['id'],
context, ports[0]['id'],
l3_port_check=False)
except Exception as e:
LOG.error(_LE("Failed to delete md_proxy port %(port)s: "

View File

@ -1072,7 +1072,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context.session, dhcp_edge['edge_id'])
if rtr_binding:
rtr_id = rtr_binding['router_id']
self.metadata_proxy_handler.cleanup_router_edge(rtr_id)
self.metadata_proxy_handler.cleanup_router_edge(
context, rtr_id)
def _update_dhcp_edge_service(self, context, network_id, address_groups):
self.edge_manager.update_dhcp_edge_service(

View File

@ -158,6 +158,9 @@ class EdgeManager(object):
edge_type=nsxv_constants.SERVICE_EDGE,
availability_zone=None):
"""Create an edge for logical router support."""
if context is None:
context = q_context.get_admin_context()
# deploy edge
self.nsxv_manager.deploy_edge(context, lrouter['id'],
lrouter['name'], internal_network=None,
@ -186,16 +189,16 @@ class EdgeManager(object):
appliance_size=nsxv_constants.COMPACT,
edge_type=nsxv_constants.SERVICE_EDGE,
availability_zone=None):
eventlet.spawn_n(self._pool_creator, context, router_ids,
appliance_size, edge_type, availability_zone)
eventlet.spawn_n(self._pool_creator, router_ids, appliance_size,
edge_type, availability_zone)
def _pool_creator(self, context, router_ids, appliance_size,
edge_type, availability_zone):
def _pool_creator(self, router_ids, appliance_size, edge_type,
availability_zone):
for router_id in router_ids:
fake_router = {
'id': router_id,
'name': router_id}
self.worker_pool.spawn_n(self._deploy_edge, context, fake_router,
self.worker_pool.spawn_n(self._deploy_edge, None, fake_router,
appliance_size=appliance_size,
edge_type=edge_type,
availability_zone=availability_zone)

View File

@ -588,8 +588,7 @@ class EdgeManagerTestCase(EdgeUtilsTestCaseMixin):
binding_ids = [bind.router_id for bind in router_bindings]
self.assertEqual(2, len(router_bindings))
edge_utils.eventlet.spawn_n.assert_called_with(
mock.ANY, mock.ANY, binding_ids, appliance_size,
edge_type, self.az)
mock.ANY, binding_ids, appliance_size, edge_type, self.az)
def test_check_backup_edge_pools_with_empty_conf(self):
pool_edges = (self._create_edge_pools(1, 2, 3, 4, 5) +