From fe72c1bd0c98bc5c403e867a4c284f8e66a4d10d Mon Sep 17 00:00:00 2001 From: Kobi Samoray Date: Tue, 6 Sep 2016 18:03:22 +0300 Subject: [PATCH] NSXv: use contexts correctly while using threads Contexts are not thread safe and therefore, methods which are called as thread entry point should create their own context, and pass to any called methods which are using contexts. Change-Id: Ia8629c211807972d228358893a7b787c55b5be7f --- .../nsx_v/drivers/exclusive_router_driver.py | 6 +- .../nsx_v/drivers/shared_router_driver.py | 5 +- vmware_nsx/plugins/nsx_v/md_proxy.py | 107 ++++++++++-------- vmware_nsx/plugins/nsx_v/plugin.py | 3 +- .../plugins/nsx_v/vshield/edge_utils.py | 13 ++- .../unit/nsx_v/vshield/test_edge_utils.py | 3 +- 6 files changed, 74 insertions(+), 63 deletions(-) diff --git a/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py b/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py index bd0884ef37..7ec9484752 100644 --- a/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py +++ b/vmware_nsx/plugins/nsx_v/drivers/exclusive_router_driver.py @@ -42,7 +42,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver): availability_zone=availability_zone) if allow_metadata: self.plugin.metadata_proxy_handler.configure_router_edge( - lrouter['id']) + lrouter['id'], context) def update_router(self, context, router_id, router): r = router['router'] @@ -84,7 +84,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver): self.edge_manager.unbind_router_on_edge(context, router_id) metadata_proxy_handler = self.plugin.metadata_proxy_handler if metadata_proxy_handler: - metadata_proxy_handler.cleanup_router_edge(router_id) + metadata_proxy_handler.cleanup_router_edge(context, router_id) def _build_router_data_from_db(self, router_db, router): """Return a new dictionary with all DB & requested router attributes @@ -135,7 +135,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver): def delete_router(self, context, router_id): if self.plugin.metadata_proxy_handler: self.plugin.metadata_proxy_handler.cleanup_router_edge( - router_id) + context, router_id) self.edge_manager.delete_lrouter(context, router_id, dist=False) def update_routes(self, context, router_id, nexthop): diff --git a/vmware_nsx/plugins/nsx_v/drivers/shared_router_driver.py b/vmware_nsx/plugins/nsx_v/drivers/shared_router_driver.py index 3f7f8a492e..dc98484eed 100644 --- a/vmware_nsx/plugins/nsx_v/drivers/shared_router_driver.py +++ b/vmware_nsx/plugins/nsx_v/drivers/shared_router_driver.py @@ -584,7 +584,8 @@ class RouterSharedDriver(router_driver.RouterBaseDriver): # configure metadata service on the router. metadata_proxy_handler = self.plugin.metadata_proxy_handler if metadata_proxy_handler and new: - metadata_proxy_handler.configure_router_edge(router_id) + metadata_proxy_handler.configure_router_edge(router_id, + context) edge_id = edge_utils.get_router_edge_id(context, router_id) with locking.LockManager.get_lock(str(edge_id)): # add all internal interfaces of the router on edge @@ -602,7 +603,7 @@ class RouterSharedDriver(router_driver.RouterBaseDriver): self.edge_manager.unbind_router_on_edge(context, router_id) metadata_proxy_handler = self.plugin.metadata_proxy_handler if metadata_proxy_handler: - metadata_proxy_handler.cleanup_router_edge(router_id) + metadata_proxy_handler.cleanup_router_edge(context, router_id) def _add_router_services_on_available_edge(self, context, router_id): router_ids = self.edge_manager.get_routers_on_same_edge( diff --git a/vmware_nsx/plugins/nsx_v/md_proxy.py b/vmware_nsx/plugins/nsx_v/md_proxy.py index 5cc001e83e..14fef083a3 100644 --- a/vmware_nsx/plugins/nsx_v/md_proxy.py +++ b/vmware_nsx/plugins/nsx_v/md_proxy.py @@ -107,16 +107,16 @@ class NsxVMetadataProxyHandler(object): def __init__(self, nsxv_plugin): self.nsxv_plugin = nsxv_plugin - self.context = neutron_context.get_admin_context() + context = neutron_context.get_admin_context() # Init cannot run concurrently on multiple nodes with locking.LockManager.get_lock('nsx-metadata-init'): self.internal_net, self.internal_subnet = ( - self._get_internal_network_and_subnet()) + self._get_internal_network_and_subnet(context)) - self.proxy_edge_ips = self._get_proxy_edges() + self.proxy_edge_ips = self._get_proxy_edges(context) - def _create_metadata_internal_network(self, cidr): + def _create_metadata_internal_network(self, context, cidr): # Neutron requires a network to have some tenant_id tenant_id = nsxv_constants.INTERNAL_TENANT_ID @@ -125,7 +125,7 @@ class NsxVMetadataProxyHandler(object): 'port_security_enabled': False, 'shared': False, 'tenant_id': tenant_id}} - net = self.nsxv_plugin.create_network(self.context, net_data) + net = self.nsxv_plugin.create_network(context, net_data) subnet_data = {'subnet': {'cidr': cidr, @@ -140,18 +140,18 @@ class NsxVMetadataProxyHandler(object): 'tenant_id': tenant_id}} subnet = self.nsxv_plugin.create_subnet( - self.context, + context, subnet_data) return net['id'], subnet['id'] - def _get_internal_network_and_subnet(self): + def _get_internal_network_and_subnet(self, context): internal_net = None internal_subnet = None # Try to find internal net, internal subnet. If not found, create new net_list = nsxv_db.get_nsxv_internal_network( - self.context.session, + context.session, vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE) if net_list: @@ -159,7 +159,7 @@ class NsxVMetadataProxyHandler(object): if internal_net: internal_subnet = self.nsxv_plugin.get_subnets( - self.context, + context, fields=['id'], filters={'network_id': [internal_net]})[0]['id'] @@ -169,15 +169,15 @@ class NsxVMetadataProxyHandler(object): try: internal_net, internal_subnet = ( self._create_metadata_internal_network( - INTERNAL_SUBNET)) + context, INTERNAL_SUBNET)) except Exception as e: nsxv_db.delete_nsxv_internal_network( - self.context.session, + context.session, vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE) # if network is created, clean up if internal_net: - self.nsxv_plugin.delete_network(self.context, + self.nsxv_plugin.delete_network(context, internal_net) LOG.exception(_LE("Exception %s while creating internal " @@ -186,7 +186,7 @@ class NsxVMetadataProxyHandler(object): # Update the new network_id in DB nsxv_db.create_nsxv_internal_network( - self.context.session, + context.session, nsxv_constants.INTER_EDGE_PURPOSE, internal_net) else: @@ -196,25 +196,23 @@ class NsxVMetadataProxyHandler(object): return internal_net, internal_subnet - def _get_edge_internal_ip(self, rtr_id): + def _get_edge_internal_ip(self, context, rtr_id): filters = { 'network_id': [self.internal_net], 'device_id': [rtr_id]} - ports = self.nsxv_plugin.get_ports(self.context, filters=filters) + ports = self.nsxv_plugin.get_ports(context, filters=filters) if ports: return ports[0]['fixed_ips'][0]['ip_address'] else: LOG.error(_LE("No port found for metadata for %s"), rtr_id) - def _get_edge_rtr_id_by_ext_ip(self, edge_ip): + def _get_edge_rtr_id_by_ext_ip(self, context, edge_ip): rtr_list = nsxv_db.get_nsxv_internal_edge( - self.context.session, edge_ip) + context.session, edge_ip) if rtr_list: return rtr_list[0]['router_id'] - def _get_edge_id_by_rtr_id(self, rtr_id, context=None): - if not context: - context = self.context + def _get_edge_id_by_rtr_id(self, context, rtr_id): binding = nsxv_db.get_nsxv_router_binding( context.session, rtr_id) @@ -222,10 +220,10 @@ class NsxVMetadataProxyHandler(object): if binding: return binding['edge_id'] - def _get_proxy_edges(self): + def _get_proxy_edges(self, context): proxy_edge_ips = [] - db_edge_ips = get_db_internal_edge_ips(self.context) + db_edge_ips = get_db_internal_edge_ips(context) if len(db_edge_ips) > len(cfg.CONF.nsxv.mgt_net_proxy_ips): error = _('Number of configured metadata proxy IPs is smaller ' 'than number of Edges which are already provisioned') @@ -279,10 +277,12 @@ class NsxVMetadataProxyHandler(object): def _setup_proxy_edge_route_and_connectivity(self, rtr_ext_ip, rtr_id=None, edge_id=None): + # Use separate context per each as we use this in tread context + context = neutron_context.get_admin_context() if not rtr_id: - rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_ext_ip) + rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_ext_ip) if not edge_id: - edge_id = self._get_edge_id_by_rtr_id(rtr_id) + edge_id = self._get_edge_id_by_rtr_id(context, rtr_id) # Read and validate DGW. If different, replace with new value try: @@ -300,7 +300,7 @@ class NsxVMetadataProxyHandler(object): if dgw != cfg.CONF.nsxv.mgt_net_default_gateway: if cfg.CONF.nsxv.metadata_initializer: self.nsxv_plugin._update_routes( - self.context, rtr_id, + context, rtr_id, cfg.CONF.nsxv.mgt_net_default_gateway) else: error = _('Metadata initialization is incomplete on ' @@ -365,31 +365,37 @@ class NsxVMetadataProxyHandler(object): "proxy edge %(edge)s: %(err)s"), {'edge': edge_id, 'err': e}) - edge_ip = self._get_edge_internal_ip(rtr_id) + edge_ip = self._get_edge_internal_ip(context, rtr_id) if edge_ip: return edge_ip def _setup_proxy_edge_external_interface_ip(self, rtr_ext_ips): + # Use separate context per each as we use this in tread context + context = neutron_context.get_admin_context() + rtr_old_ext_ip, rtr_new_ext_ip = rtr_ext_ips - rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_old_ext_ip) - edge_id = self._get_edge_id_by_rtr_id(rtr_id) + rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_old_ext_ip) + edge_id = self._get_edge_id_by_rtr_id(context, rtr_id) # Replace DB entry as we cannot update the table PK - nsxv_db.delete_nsxv_internal_edge(self.context.session, rtr_old_ext_ip) + nsxv_db.delete_nsxv_internal_edge(context.session, rtr_old_ext_ip) edge_ip = self._setup_proxy_edge_route_and_connectivity( rtr_new_ext_ip, rtr_id, edge_id) nsxv_db.create_nsxv_internal_edge( - self.context.session, rtr_new_ext_ip, + context.session, rtr_new_ext_ip, vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id) if edge_ip: return edge_ip def _setup_new_proxy_edge(self, rtr_ext_ip): + # Use separate context per each as we use this in tread context + context = neutron_context.get_admin_context() + rtr_id = None try: router_data = { @@ -400,12 +406,12 @@ class NsxVMetadataProxyHandler(object): 'tenant_id': None}} rtr = self.nsxv_plugin.create_router( - self.context, + context, router_data, allow_metadata=False) rtr_id = rtr['id'] - edge_id = self._get_edge_id_by_rtr_id(rtr_id) + edge_id = self._get_edge_id_by_rtr_id(context, rtr_id) self.nsxv_plugin.nsx_v.update_interface( rtr['id'], @@ -428,14 +434,14 @@ class NsxVMetadataProxyHandler(object): 'port_security_enabled': False, 'tenant_id': None}} - port = self.nsxv_plugin.create_port(self.context, port_data) + port = self.nsxv_plugin.create_port(context, port_data) address_groups = self._get_address_groups( - self.context, self.internal_net, rtr_id, is_proxy=True) + context, self.internal_net, rtr_id, is_proxy=True) edge_ip = port['fixed_ips'][0]['ip_address'] edge_utils.update_internal_interface( - self.nsxv_plugin.nsx_v, self.context, rtr_id, + self.nsxv_plugin.nsx_v, context, rtr_id, self.internal_net, address_groups) self._setup_metadata_lb(rtr_id, @@ -454,18 +460,18 @@ class NsxVMetadataProxyHandler(object): edge_utils.update_firewall( self.nsxv_plugin.nsx_v, - self.context, + context, rtr_id, {'firewall_rule_list': firewall_rules}, allow_external=False) if cfg.CONF.nsxv.mgt_net_default_gateway: self.nsxv_plugin._update_routes( - self.context, rtr_id, + context, rtr_id, cfg.CONF.nsxv.mgt_net_default_gateway) nsxv_db.create_nsxv_internal_edge( - self.context.session, rtr_ext_ip, + context.session, rtr_ext_ip, vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id) return edge_ip @@ -475,19 +481,19 @@ class NsxVMetadataProxyHandler(object): "for metadata service"), e) ports = self.nsxv_plugin.get_ports( - self.context, filters={'device_id': [rtr_id]}) + context, filters={'device_id': [rtr_id]}) for port in ports: - self.nsxv_plugin.delete_port(self.context, port['id'], + self.nsxv_plugin.delete_port(context, port['id'], l3_port_check=True, nw_gw_port_check=True) nsxv_db.delete_nsxv_internal_edge( - self.context.session, + context.session, rtr_ext_ip) if rtr_id: - self.nsxv_plugin.delete_router(self.context, rtr_id) + self.nsxv_plugin.delete_router(context, rtr_id) def _get_address_groups(self, context, network_id, device_id, is_proxy): @@ -559,9 +565,9 @@ class NsxVMetadataProxyHandler(object): proxy_lb=False, context=None): if context is None: - context = self.context + context = neutron_context.get_admin_context() - edge_id = self._get_edge_id_by_rtr_id(rtr_id, context=context) + edge_id = self._get_edge_id_by_rtr_id(context, rtr_id) LOG.debug('Setting up Edge device %s', edge_id) lb_obj = nsxv_lb.NsxvLoadbalancer() @@ -641,7 +647,8 @@ class NsxVMetadataProxyHandler(object): lb_obj.submit_to_backend(self.nsxv_plugin.nsx_v.vcns, edge_id) - def configure_router_edge(self, rtr_id, context=None): + def configure_router_edge(self, rtr_id, context): + ctx = context.elevated() # Connect router interface to inter-edge network port_data = { 'port': { @@ -655,10 +662,10 @@ class NsxVMetadataProxyHandler(object): 'port_security_enabled': False, 'tenant_id': None}} - self.nsxv_plugin.create_port(self.context, port_data) + self.nsxv_plugin.create_port(ctx, port_data) address_groups = self._get_address_groups( - self.context, + ctx, self.internal_net, rtr_id, is_proxy=False) @@ -681,11 +688,11 @@ class NsxVMetadataProxyHandler(object): proxy_lb=False, context=context) - def cleanup_router_edge(self, rtr_id, warn=False): + def cleanup_router_edge(self, context, rtr_id, warn=False): filters = { 'network_id': [self.internal_net], 'device_id': [rtr_id]} - ports = self.nsxv_plugin.get_ports(self.context, filters=filters) + ports = self.nsxv_plugin.get_ports(context, filters=filters) if ports: if warn: @@ -694,7 +701,7 @@ class NsxVMetadataProxyHandler(object): {'port': ports[0]['id'], 'router': rtr_id}) try: self.nsxv_plugin.delete_port( - self.context, ports[0]['id'], + context, ports[0]['id'], l3_port_check=False) except Exception as e: LOG.error(_LE("Failed to delete md_proxy port %(port)s: " diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 7b28d3e2cf..2b1351b388 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -1052,7 +1052,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, context.session, dhcp_edge['edge_id']) if rtr_binding: rtr_id = rtr_binding['router_id'] - self.metadata_proxy_handler.cleanup_router_edge(rtr_id) + self.metadata_proxy_handler.cleanup_router_edge( + context, rtr_id) def _update_dhcp_edge_service(self, context, network_id, address_groups): self.edge_manager.update_dhcp_edge_service( diff --git a/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py b/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py index bea803de8e..153db04ece 100644 --- a/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py +++ b/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py @@ -158,6 +158,9 @@ class EdgeManager(object): edge_type=nsxv_constants.SERVICE_EDGE, availability_zone=None): """Create an edge for logical router support.""" + if context is None: + context = q_context.get_admin_context() + # deploy edge self.nsxv_manager.deploy_edge(context, lrouter['id'], lrouter['name'], internal_network=None, @@ -186,16 +189,16 @@ class EdgeManager(object): appliance_size=nsxv_constants.COMPACT, edge_type=nsxv_constants.SERVICE_EDGE, availability_zone=None): - eventlet.spawn_n(self._pool_creator, context, router_ids, - appliance_size, edge_type, availability_zone) + eventlet.spawn_n(self._pool_creator, router_ids, appliance_size, + edge_type, availability_zone) - def _pool_creator(self, context, router_ids, appliance_size, - edge_type, availability_zone): + def _pool_creator(self, router_ids, appliance_size, edge_type, + availability_zone): for router_id in router_ids: fake_router = { 'id': router_id, 'name': router_id} - self.worker_pool.spawn_n(self._deploy_edge, context, fake_router, + self.worker_pool.spawn_n(self._deploy_edge, None, fake_router, appliance_size=appliance_size, edge_type=edge_type, availability_zone=availability_zone) diff --git a/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py b/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py index 332c07bba1..282d821fe1 100644 --- a/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py +++ b/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py @@ -588,8 +588,7 @@ class EdgeManagerTestCase(EdgeUtilsTestCaseMixin): binding_ids = [bind.router_id for bind in router_bindings] self.assertEqual(2, len(router_bindings)) edge_utils.eventlet.spawn_n.assert_called_with( - mock.ANY, mock.ANY, binding_ids, appliance_size, - edge_type, self.az) + mock.ANY, binding_ids, appliance_size, edge_type, self.az) def test_check_backup_edge_pools_with_empty_conf(self): pool_edges = (self._create_edge_pools(1, 2, 3, 4, 5) +